2021-04-18 12:41:13 -07:00
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
2020-07-21 17:49:56 -07:00
package cmd
import (
"context"
2022-02-10 10:16:52 -08:00
"encoding/binary"
"errors"
2020-09-15 20:44:48 -07:00
"fmt"
2021-06-30 07:44:24 -07:00
"io"
2021-11-19 14:46:14 -08:00
"math"
2020-07-21 17:49:56 -07:00
"net/http"
2022-02-10 10:16:52 -08:00
"path"
2021-02-03 20:41:33 -08:00
"reflect"
2020-08-12 17:32:24 -07:00
"strings"
2021-03-09 02:56:42 -08:00
"sync"
2020-07-21 17:49:56 -07:00
"time"
2022-01-11 22:32:29 -08:00
"github.com/dustin/go-humanize"
2021-05-06 08:52:02 -07:00
"github.com/minio/madmin-go"
2021-08-23 17:16:18 +02:00
"github.com/minio/minio-go/v7"
2020-07-21 17:49:56 -07:00
miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/tags"
2021-06-01 14:59:40 -07:00
"github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/event"
2021-06-30 07:44:24 -07:00
"github.com/minio/minio/internal/hash"
2021-06-01 14:59:40 -07:00
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
2020-07-21 17:49:56 -07:00
)
2021-09-18 16:31:35 -04:00
const (
throttleDeadline = 1 * time . Hour
// ReplicationReset has reset id and timestamp of last reset operation
ReplicationReset = "replication-reset"
// ReplicationStatus has internal replication status - stringified representation of target's replication status for all replication
// activity initiated from this cluster
ReplicationStatus = "replication-status"
// ReplicationTimestamp - the last time replication was initiated on this cluster for this object version
ReplicationTimestamp = "replication-timestamp"
// ReplicaStatus - this header is present if a replica was received by this cluster for this object version
ReplicaStatus = "replica-status"
// ReplicaTimestamp - the last time a replica was received by this cluster for this object version
ReplicaTimestamp = "replica-timestamp"
// TaggingTimestamp - the last time a tag metadata modification happened on this cluster for this object version
TaggingTimestamp = "tagging-timestamp"
// ObjectLockRetentionTimestamp - the last time a object lock metadata modification happened on this cluster for this object version
ObjectLockRetentionTimestamp = "objectlock-retention-timestamp"
// ObjectLockLegalHoldTimestamp - the last time a legal hold metadata modification happened on this cluster for this object version
ObjectLockLegalHoldTimestamp = "objectlock-legalhold-timestamp"
2021-12-13 18:22:56 -08:00
// ReplicationWorkerMultiplier is suggested worker multiplier if traffic exceeds replication worker capacity
ReplicationWorkerMultiplier = 1.5
2021-09-18 16:31:35 -04:00
)
2021-07-28 15:20:01 -07:00
2020-07-30 19:55:22 -07:00
// gets replication config associated to a given bucket name.
func getReplicationConfig ( ctx context . Context , bucketName string ) ( rc * replication . Config , err error ) {
2020-07-21 17:49:56 -07:00
if globalIsGateway {
2020-10-09 09:59:52 -07:00
objAPI := newObjectLayerFn ( )
2020-07-21 17:49:56 -07:00
if objAPI == nil {
2021-09-18 16:31:35 -04:00
return rc , errServerNotInitialized
2020-07-21 17:49:56 -07:00
}
2021-09-18 16:31:35 -04:00
return rc , BucketReplicationConfigNotFound { Bucket : bucketName }
2020-07-21 17:49:56 -07:00
}
2022-04-24 15:06:31 +05:30
rCfg , _ , err := globalBucketMetadataSys . GetReplicationConfig ( ctx , bucketName )
return rCfg , err
2020-07-21 17:49:56 -07:00
}
2020-07-30 19:55:22 -07:00
// validateReplicationDestination returns error if replication destination bucket missing or not configured
2020-07-21 17:49:56 -07:00
// It also returns true if replication destination is same as this server.
2022-05-26 17:57:23 -07:00
func validateReplicationDestination ( ctx context . Context , bucket string , rCfg * replication . Config , checkRemote bool ) ( bool , APIError ) {
2021-09-18 16:31:35 -04:00
var arns [ ] string
if rCfg . RoleArn != "" {
arns = append ( arns , rCfg . RoleArn )
} else {
for _ , rule := range rCfg . Rules {
arns = append ( arns , rule . Destination . String ( ) )
}
2020-07-21 17:49:56 -07:00
}
2022-05-26 17:57:23 -07:00
var sameTarget bool
2021-09-18 16:31:35 -04:00
for _ , arnStr := range arns {
arn , err := madmin . ParseARN ( arnStr )
if err != nil {
2022-05-26 17:57:23 -07:00
return sameTarget , errorCodes . ToAPIErrWithErr ( ErrBucketRemoteArnInvalid , err )
2021-09-18 16:31:35 -04:00
}
if arn . Type != madmin . ReplicationService {
2022-05-26 17:57:23 -07:00
return sameTarget , toAPIError ( ctx , BucketRemoteArnTypeInvalid { Bucket : bucket } )
2021-09-18 16:31:35 -04:00
}
clnt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , arnStr )
if clnt == nil {
2022-05-26 17:57:23 -07:00
return sameTarget , toAPIError ( ctx , BucketRemoteTargetNotFound { Bucket : bucket } )
2021-09-18 16:31:35 -04:00
}
2022-05-26 17:57:23 -07:00
if checkRemote { // validate remote bucket
if found , err := clnt . BucketExists ( ctx , arn . Bucket ) ; ! found {
return sameTarget , errorCodes . ToAPIErrWithErr ( ErrRemoteDestinationNotFoundError , err )
}
if ret , err := globalBucketObjectLockSys . Get ( bucket ) ; err == nil {
if ret . LockEnabled {
lock , _ , _ , _ , err := clnt . GetObjectLockConfig ( ctx , arn . Bucket )
if err != nil || lock != "Enabled" {
return sameTarget , errorCodes . ToAPIErrWithErr ( ErrReplicationDestinationMissingLock , err )
}
2021-09-18 16:31:35 -04:00
}
2020-08-04 23:02:27 -07:00
}
}
2021-09-18 16:31:35 -04:00
// validate replication ARN against target endpoint
c , ok := globalBucketTargetSys . arnRemotesMap [ arnStr ]
if ok {
if c . EndpointURL ( ) . String ( ) == clnt . EndpointURL ( ) . String ( ) {
2022-05-26 17:57:23 -07:00
selfTarget , _ := isLocalHost ( clnt . EndpointURL ( ) . Hostname ( ) , clnt . EndpointURL ( ) . Port ( ) , globalMinioPort )
if ! sameTarget {
sameTarget = selfTarget
}
continue
2021-09-18 16:31:35 -04:00
}
2020-07-21 17:49:56 -07:00
}
}
2022-05-26 17:57:23 -07:00
if len ( arns ) == 0 {
return false , toAPIError ( ctx , BucketRemoteTargetNotFound { Bucket : bucket } )
}
return sameTarget , toAPIError ( ctx , nil )
2020-07-21 17:49:56 -07:00
}
2021-06-01 19:59:11 -07:00
type mustReplicateOptions struct {
2021-09-18 16:31:35 -04:00
meta map [ string ] string
status replication . StatusType
opType replication . Type
replicationRequest bool // incoming request is a replication request
2021-06-01 19:59:11 -07:00
}
func ( o mustReplicateOptions ) ReplicationStatus ( ) ( s replication . StatusType ) {
if rs , ok := o . meta [ xhttp . AmzBucketReplicationStatus ] ; ok {
return replication . StatusType ( rs )
}
return s
}
2022-01-02 09:15:06 -08:00
2021-06-01 19:59:11 -07:00
func ( o mustReplicateOptions ) isExistingObjectReplication ( ) bool {
return o . opType == replication . ExistingObjectReplicationType
}
func ( o mustReplicateOptions ) isMetadataReplication ( ) bool {
return o . opType == replication . MetadataReplicationType
}
2022-01-02 09:15:06 -08:00
2021-09-18 16:31:35 -04:00
func getMustReplicateOptions ( o ObjectInfo , op replication . Type , opts ObjectOptions ) mustReplicateOptions {
2021-06-01 19:59:11 -07:00
if ! op . Valid ( ) {
op = replication . ObjectReplicationType
if o . metadataOnly {
op = replication . MetadataReplicationType
}
}
meta := cloneMSS ( o . UserDefined )
if o . UserTags != "" {
meta [ xhttp . AmzObjectTagging ] = o . UserTags
}
2021-09-18 16:31:35 -04:00
2021-06-01 19:59:11 -07:00
return mustReplicateOptions {
2021-09-18 16:31:35 -04:00
meta : meta ,
status : o . ReplicationStatus ,
opType : op ,
replicationRequest : opts . ReplicationRequest ,
2021-06-01 19:59:11 -07:00
}
}
2021-04-29 19:01:43 -07:00
2021-01-11 22:36:51 -08:00
// mustReplicate returns 2 booleans - true if object meets replication criteria and true if replication is to be done in
// a synchronous manner.
2021-09-18 16:31:35 -04:00
func mustReplicate ( ctx context . Context , bucket , object string , mopts mustReplicateOptions ) ( dsc ReplicateDecision ) {
2020-07-21 17:49:56 -07:00
if globalIsGateway {
2021-09-18 16:31:35 -04:00
return
2020-07-21 17:49:56 -07:00
}
2021-09-18 16:31:35 -04:00
2022-05-31 02:57:57 -07:00
// object layer not initialized we return with no decision.
if newObjectLayerFn ( ) == nil {
return
}
2022-05-06 19:05:28 -07:00
// Disable server-side replication on object prefixes which are excluded
// from versioning via the MinIO bucket versioning extension.
2022-08-24 13:46:29 -07:00
if ! globalBucketVersioningSys . PrefixEnabled ( bucket , object ) {
2022-05-06 19:05:28 -07:00
return
}
2022-05-31 02:57:57 -07:00
2021-06-01 19:59:11 -07:00
replStatus := mopts . ReplicationStatus ( )
if replStatus == replication . Replica && ! mopts . isMetadataReplication ( ) {
2021-09-18 16:31:35 -04:00
return
}
if mopts . replicationRequest { // incoming replication request on target cluster
return
2020-07-21 17:49:56 -07:00
}
2020-07-30 19:55:22 -07:00
cfg , err := getReplicationConfig ( ctx , bucket )
2020-07-21 17:49:56 -07:00
if err != nil {
2021-09-18 16:31:35 -04:00
return
2020-07-21 17:49:56 -07:00
}
opts := replication . ObjectOpts {
2021-06-01 19:59:11 -07:00
Name : object ,
SSEC : crypto . SSEC . IsEncrypted ( mopts . meta ) ,
Replica : replStatus == replication . Replica ,
ExistingObject : mopts . isExistingObjectReplication ( ) ,
2020-07-21 17:49:56 -07:00
}
2021-06-01 19:59:11 -07:00
tagStr , ok := mopts . meta [ xhttp . AmzObjectTagging ]
2020-07-21 17:49:56 -07:00
if ok {
opts . UserTags = tagStr
}
2021-09-18 16:31:35 -04:00
tgtArns := cfg . FilterTargetArns ( opts )
for _ , tgtArn := range tgtArns {
tgt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , tgtArn )
// the target online status should not be used here while deciding
// whether to replicate as the target could be temporarily down
opts . TargetArn = tgtArn
replicate := cfg . Replicate ( opts )
var synchronous bool
if tgt != nil {
synchronous = tgt . replicateSync
}
dsc . Set ( newReplicateTargetDecision ( tgtArn , replicate , synchronous ) )
2021-01-11 22:36:51 -08:00
}
2021-09-18 16:31:35 -04:00
return dsc
2021-01-11 22:36:51 -08:00
}
// Standard headers that needs to be extracted from User metadata.
var standardHeaders = [ ] string {
2021-01-27 11:22:34 -08:00
xhttp . ContentType ,
xhttp . CacheControl ,
xhttp . ContentEncoding ,
xhttp . ContentLanguage ,
xhttp . ContentDisposition ,
2021-01-11 22:36:51 -08:00
xhttp . AmzStorageClass ,
xhttp . AmzObjectTagging ,
xhttp . AmzBucketReplicationStatus ,
2021-01-27 11:22:34 -08:00
xhttp . AmzObjectLockMode ,
xhttp . AmzObjectLockRetainUntilDate ,
xhttp . AmzObjectLockLegalHold ,
xhttp . AmzTagCount ,
xhttp . AmzServerSideEncryption ,
2020-07-21 17:49:56 -07:00
}
2020-11-19 18:43:58 -08:00
// returns true if any of the objects being deleted qualifies for replication.
func hasReplicationRules ( ctx context . Context , bucket string , objects [ ] ObjectToDelete ) bool {
c , err := getReplicationConfig ( ctx , bucket )
if err != nil || c == nil {
return false
}
for _ , obj := range objects {
if c . HasActiveRules ( obj . ObjectName , true ) {
return true
}
}
return false
}
2021-01-11 22:36:51 -08:00
// isStandardHeader returns true if header is a supported header and not a custom header
2021-02-03 20:41:33 -08:00
func isStandardHeader ( matchHeaderKey string ) bool {
return equals ( matchHeaderKey , standardHeaders ... )
2021-01-11 22:36:51 -08:00
}
2020-11-19 18:43:58 -08:00
// returns whether object version is a deletemarker and if object qualifies for replication
2021-09-18 16:31:35 -04:00
func checkReplicateDelete ( ctx context . Context , bucket string , dobj ObjectToDelete , oi ObjectInfo , delOpts ObjectOptions , gerr error ) ( dsc ReplicateDecision ) {
2020-11-19 18:43:58 -08:00
rcfg , err := getReplicationConfig ( ctx , bucket )
if err != nil || rcfg == nil {
2021-09-18 16:31:35 -04:00
return
}
// If incoming request is a replication request, it does not need to be re-replicated.
if delOpts . ReplicationRequest {
return
2020-11-19 18:43:58 -08:00
}
2022-05-06 19:05:28 -07:00
// Skip replication if this object's prefix is excluded from being
// versioned.
2022-05-07 22:06:44 -07:00
if ! delOpts . Versioned {
2022-05-06 19:05:28 -07:00
return
}
2021-02-18 16:35:37 -08:00
opts := replication . ObjectOpts {
Name : dobj . ObjectName ,
SSEC : crypto . SSEC . IsEncrypted ( oi . UserDefined ) ,
UserTags : oi . UserTags ,
DeleteMarker : oi . DeleteMarker ,
VersionID : dobj . VersionID ,
2021-03-13 10:28:35 -08:00
OpType : replication . DeleteReplicationType ,
2021-02-18 16:35:37 -08:00
}
2021-09-18 16:31:35 -04:00
tgtArns := rcfg . FilterTargetArns ( opts )
if len ( tgtArns ) > 0 {
dsc . targetsMap = make ( map [ string ] replicateTargetDecision , len ( tgtArns ) )
var sync , replicate bool
for _ , tgtArn := range tgtArns {
opts . TargetArn = tgtArn
replicate = rcfg . Replicate ( opts )
// when incoming delete is removal of a delete marker( a.k.a versioned delete),
// GetObjectInfo returns extra information even though it returns errFileNotFound
if gerr != nil {
validReplStatus := false
switch oi . TargetReplicationStatus ( tgtArn ) {
case replication . Pending , replication . Completed , replication . Failed :
validReplStatus = true
}
if oi . DeleteMarker && ( validReplStatus || replicate ) {
dsc . Set ( newReplicateTargetDecision ( tgtArn , replicate , sync ) )
continue
} else {
// can be the case that other cluster is down and duplicate `mc rm --vid`
// is issued - this still needs to be replicated back to the other target
replicate = oi . VersionPurgeStatus == Pending || oi . VersionPurgeStatus == Failed
dsc . Set ( newReplicateTargetDecision ( tgtArn , replicate , sync ) )
continue
}
}
tgt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , tgtArn )
// the target online status should not be used here while deciding
// whether to replicate deletes as the target could be temporarily down
tgtDsc := newReplicateTargetDecision ( tgtArn , false , false )
if tgt != nil {
tgtDsc = newReplicateTargetDecision ( tgtArn , replicate , tgt . replicateSync )
}
dsc . Set ( tgtDsc )
}
}
return dsc
2020-11-19 18:43:58 -08:00
}
// replicate deletes to the designated replication target if replication configuration
// has delete marker replication or delete replication (MinIO extension to allow deletes where version id
// is specified) enabled.
// Similar to bucket replication for PUT operation, soft delete (a.k.a setting delete marker) and
// permanent deletes (by specifying a version ID in the delete operation) have three states "Pending", "Complete"
// and "Failed" to mark the status of the replication of "DELETE" operation. All failed operations can
// then be retried by healing. In the case of permanent deletes, until the replication is completed on the
// target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently
// deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds
// on target.
2022-07-12 10:43:32 -07:00
func replicateDelete ( ctx context . Context , dobj DeletedObjectReplicationInfo , objectAPI ObjectLayer ) {
2021-09-18 16:31:35 -04:00
var replicationStatus replication . StatusType
2020-11-19 18:43:58 -08:00
bucket := dobj . Bucket
2021-02-03 20:41:33 -08:00
versionID := dobj . DeleteMarkerVersionID
if versionID == "" {
versionID = dobj . VersionID
}
2021-07-01 14:02:44 -07:00
defer func ( ) {
2021-09-18 16:31:35 -04:00
replStatus := string ( replicationStatus )
2021-07-01 14:02:44 -07:00
auditLogInternal ( context . Background ( ) , bucket , dobj . ObjectName , AuditLogOptions {
2022-07-12 10:43:32 -07:00
Event : dobj . EventType ,
2021-07-01 14:02:44 -07:00
APIName : ReplicateDeleteAPI ,
VersionID : versionID ,
Status : replStatus ,
} )
} ( )
2020-11-19 18:43:58 -08:00
rcfg , err := getReplicationConfig ( ctx , bucket )
if err != nil || rcfg == nil {
2021-01-27 11:22:34 -08:00
logger . LogIf ( ctx , err )
2021-02-03 20:41:33 -08:00
sendEvent ( eventArgs {
BucketName : bucket ,
Object : ObjectInfo {
Bucket : bucket ,
Name : dobj . ObjectName ,
VersionID : versionID ,
DeleteMarker : dobj . DeleteMarker ,
} ,
Host : "Internal: [Replication]" ,
EventName : event . ObjectReplicationNotTracked ,
} )
2020-11-19 18:43:58 -08:00
return
}
2021-09-18 16:31:35 -04:00
dsc , err := parseReplicateDecision ( dobj . ReplicationState . ReplicateDecisionStr )
if err != nil {
logger . LogIf ( ctx , err )
2021-02-03 20:41:33 -08:00
sendEvent ( eventArgs {
BucketName : bucket ,
Object : ObjectInfo {
Bucket : bucket ,
Name : dobj . ObjectName ,
VersionID : versionID ,
DeleteMarker : dobj . DeleteMarker ,
} ,
Host : "Internal: [Replication]" ,
EventName : event . ObjectReplicationNotTracked ,
} )
2020-11-19 18:43:58 -08:00
return
}
2021-02-03 20:41:33 -08:00
2021-08-23 17:16:18 +02:00
// Lock the object name before starting replication operation.
// Use separate lock that doesn't collide with regular objects.
lk := objectAPI . NewNSLock ( bucket , "/[replicate]/" + dobj . ObjectName )
lkctx , err := lk . GetLock ( ctx , globalOperationTimeout )
if err != nil {
2022-08-22 16:53:06 -07:00
globalReplicationPool . queueMRFSave ( dobj . ToMRFEntry ( ) )
2021-08-23 17:16:18 +02:00
logger . LogIf ( ctx , fmt . Errorf ( "failed to get lock for object: %s bucket:%s arn:%s" , dobj . ObjectName , bucket , rcfg . RoleArn ) )
sendEvent ( eventArgs {
BucketName : bucket ,
Object : ObjectInfo {
Bucket : bucket ,
Name : dobj . ObjectName ,
VersionID : versionID ,
DeleteMarker : dobj . DeleteMarker ,
} ,
Host : "Internal: [Replication]" ,
EventName : event . ObjectReplicationNotTracked ,
} )
return
}
ctx = lkctx . Context ( )
defer lk . Unlock ( lkctx . Cancel )
2020-11-19 18:43:58 -08:00
2021-09-18 16:31:35 -04:00
var wg sync . WaitGroup
var rinfos replicatedInfos
rinfos . Targets = make ( [ ] replicatedTargetInfo , len ( dsc . targetsMap ) )
idx := - 1
for tgtArn := range dsc . targetsMap {
idx ++
tgt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , tgtArn )
if tgt == nil {
logger . LogIf ( ctx , fmt . Errorf ( "failed to get target for bucket:%s arn:%s" , bucket , tgtArn ) )
sendEvent ( eventArgs {
BucketName : bucket ,
Object : ObjectInfo {
Bucket : bucket ,
Name : dobj . ObjectName ,
VersionID : versionID ,
DeleteMarker : dobj . DeleteMarker ,
} ,
Host : "Internal: [Replication]" ,
EventName : event . ObjectReplicationNotTracked ,
} )
continue
2020-11-19 18:43:58 -08:00
}
2021-09-18 16:31:35 -04:00
if tgt := dsc . targetsMap [ tgtArn ] ; ! tgt . Replicate {
continue
2020-11-19 18:43:58 -08:00
}
2021-09-18 16:31:35 -04:00
// if dobj.TargetArn is not empty string, this is a case of specific target being re-synced.
if dobj . TargetArn != "" && dobj . TargetArn != tgt . ARN {
continue
}
wg . Add ( 1 )
go func ( index int , tgt * TargetClient ) {
defer wg . Done ( )
2022-03-16 16:59:43 -07:00
rinfo := replicateDeleteToTarget ( ctx , dobj , tgt )
2021-09-18 16:31:35 -04:00
rinfos . Targets [ index ] = rinfo
} ( idx , tgt )
2020-11-19 18:43:58 -08:00
}
2021-09-18 16:31:35 -04:00
wg . Wait ( )
replicationStatus = rinfos . ReplicationStatus ( )
prevStatus := dobj . DeleteMarkerReplicationStatus ( )
2021-04-03 09:03:42 -07:00
if dobj . VersionID != "" {
2021-09-18 16:31:35 -04:00
prevStatus = replication . StatusType ( dobj . VersionPurgeStatus ( ) )
replicationStatus = replication . StatusType ( rinfos . VersionPurgeStatus ( ) )
2021-04-03 09:03:42 -07:00
}
2021-09-18 16:31:35 -04:00
2021-04-04 15:34:33 -07:00
// to decrement pending count later.
2021-09-18 16:31:35 -04:00
for _ , rinfo := range rinfos . Targets {
if rinfo . ReplicationStatus != rinfo . PrevReplicationStatus {
2021-11-17 21:10:57 +01:00
globalReplicationStats . Update ( dobj . Bucket , rinfo . Arn , 0 , 0 , replicationStatus ,
2021-09-18 16:31:35 -04:00
prevStatus , replication . DeleteReplicationType )
}
}
2021-01-25 14:04:41 -08:00
2022-01-02 09:15:06 -08:00
eventName := event . ObjectReplicationComplete
2021-09-18 16:31:35 -04:00
if replicationStatus == replication . Failed {
2020-11-21 23:48:50 -08:00
eventName = event . ObjectReplicationFailed
2022-08-22 16:53:06 -07:00
globalReplicationPool . queueMRFSave ( dobj . ToMRFEntry ( ) )
2020-11-21 23:48:50 -08:00
}
2021-09-18 16:31:35 -04:00
drs := getReplicationState ( rinfos , dobj . ReplicationState , dobj . VersionID )
2022-07-21 11:05:44 -07:00
if replicationStatus != prevStatus {
drs . ReplicationTimeStamp = UTCNow ( )
}
2021-02-03 20:41:33 -08:00
dobjInfo , err := objectAPI . DeleteObject ( ctx , bucket , dobj . ObjectName , ObjectOptions {
2021-09-18 16:31:35 -04:00
VersionID : versionID ,
MTime : dobj . DeleteMarkerMTime . Time ,
DeleteReplication : drs ,
2022-05-06 19:05:28 -07:00
Versioned : globalBucketVersioningSys . PrefixEnabled ( bucket , dobj . ObjectName ) ,
2022-08-24 13:46:29 -07:00
// Objects matching prefixes should not leave delete markers,
// dramatically reduces namespace pollution while keeping the
// benefits of replication, make sure to apply version suspension
// only at bucket level instead.
VersionSuspended : globalBucketVersioningSys . Suspended ( bucket ) ,
2021-01-25 14:04:41 -08:00
} )
2021-02-09 15:11:43 -08:00
if err != nil && ! isErrVersionNotFound ( err ) { // VersionNotFound would be reported by pool that object version is missing on.
logger . LogIf ( ctx , fmt . Errorf ( "Unable to update replication metadata for %s/%s(%s): %s" , bucket , dobj . ObjectName , versionID , err ) )
2021-02-03 20:41:33 -08:00
sendEvent ( eventArgs {
BucketName : bucket ,
Object : ObjectInfo {
Bucket : bucket ,
Name : dobj . ObjectName ,
VersionID : versionID ,
DeleteMarker : dobj . DeleteMarker ,
} ,
Host : "Internal: [Replication]" ,
EventName : eventName ,
} )
} else {
sendEvent ( eventArgs {
BucketName : bucket ,
Object : dobjInfo ,
Host : "Internal: [Replication]" ,
EventName : eventName ,
} )
2020-11-19 18:43:58 -08:00
}
}
2022-03-16 16:59:43 -07:00
func replicateDeleteToTarget ( ctx context . Context , dobj DeletedObjectReplicationInfo , tgt * TargetClient ) ( rinfo replicatedTargetInfo ) {
2021-09-18 16:31:35 -04:00
versionID := dobj . DeleteMarkerVersionID
if versionID == "" {
versionID = dobj . VersionID
}
rinfo = dobj . ReplicationState . targetState ( tgt . ARN )
rinfo . OpType = dobj . OpType
defer func ( ) {
if rinfo . ReplicationStatus == replication . Completed && tgt . ResetID != "" && dobj . OpType == replication . ExistingObjectReplicationType {
rinfo . ResyncTimestamp = fmt . Sprintf ( "%s;%s" , UTCNow ( ) . Format ( http . TimeFormat ) , tgt . ResetID )
}
} ( )
if dobj . VersionID == "" && rinfo . PrevReplicationStatus == replication . Completed && dobj . OpType != replication . ExistingObjectReplicationType {
rinfo . ReplicationStatus = rinfo . PrevReplicationStatus
return
}
if dobj . VersionID != "" && rinfo . VersionPurgeStatus == Complete {
return
}
2022-08-16 17:46:22 -07:00
if globalBucketTargetSys . isOffline ( tgt . EndpointURL ( ) ) {
2021-09-18 16:31:35 -04:00
logger . LogIf ( ctx , fmt . Errorf ( "remote target is offline for bucket:%s arn:%s" , dobj . Bucket , tgt . ARN ) )
sendEvent ( eventArgs {
BucketName : dobj . Bucket ,
Object : ObjectInfo {
Bucket : dobj . Bucket ,
Name : dobj . ObjectName ,
VersionID : dobj . VersionID ,
DeleteMarker : dobj . DeleteMarker ,
} ,
Host : "Internal: [Replication]" ,
EventName : event . ObjectReplicationNotTracked ,
} )
if dobj . VersionID == "" {
rinfo . ReplicationStatus = replication . Failed
} else {
rinfo . VersionPurgeStatus = Failed
}
return
}
2021-12-16 15:34:55 -08:00
// early return if already replicated delete marker for existing object replication/ healing delete markers
if dobj . DeleteMarkerVersionID != "" && ( dobj . OpType == replication . ExistingObjectReplicationType || dobj . OpType == replication . HealReplicationType ) {
2021-09-18 16:31:35 -04:00
if _ , err := tgt . StatObject ( ctx , tgt . Bucket , dobj . ObjectName , miniogo . StatObjectOptions {
VersionID : versionID ,
Internal : miniogo . AdvancedGetOptions {
ReplicationProxyRequest : "false" ,
2022-01-02 09:15:06 -08:00
} ,
} ) ; isErrMethodNotAllowed ( ErrorRespToObjectError ( err , dobj . Bucket , dobj . ObjectName ) ) {
2021-09-18 16:31:35 -04:00
if dobj . VersionID == "" {
rinfo . ReplicationStatus = replication . Completed
2021-12-16 15:34:55 -08:00
return
2021-09-18 16:31:35 -04:00
}
}
}
rmErr := tgt . RemoveObject ( ctx , tgt . Bucket , dobj . ObjectName , miniogo . RemoveObjectOptions {
VersionID : versionID ,
Internal : miniogo . AdvancedRemoveOptions {
ReplicationDeleteMarker : dobj . DeleteMarkerVersionID != "" ,
ReplicationMTime : dobj . DeleteMarkerMTime . Time ,
ReplicationStatus : miniogo . ReplicationStatusReplica ,
ReplicationRequest : true , // always set this to distinguish between `mc mirror` replication and serverside
} ,
} )
if rmErr != nil {
if dobj . VersionID == "" {
rinfo . ReplicationStatus = replication . Failed
} else {
rinfo . VersionPurgeStatus = Failed
}
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate delete marker to %s/%s(%s): %s" , tgt . Bucket , dobj . ObjectName , versionID , rmErr ) )
} else {
if dobj . VersionID == "" {
rinfo . ReplicationStatus = replication . Completed
} else {
rinfo . VersionPurgeStatus = Complete
}
}
return
}
func getCopyObjMetadata ( oi ObjectInfo , sc string ) map [ string ] string {
2020-11-19 11:50:22 -08:00
meta := make ( map [ string ] string , len ( oi . UserDefined ) )
for k , v := range oi . UserDefined {
2021-02-03 20:41:33 -08:00
if strings . HasPrefix ( strings . ToLower ( k ) , ReservedMetadataPrefixLower ) {
2020-11-19 11:50:22 -08:00
continue
}
2021-02-03 20:41:33 -08:00
if equals ( k , xhttp . AmzBucketReplicationStatus ) {
continue
}
// https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w
if equals ( k , xhttp . AmzMetaUnencryptedContentLength , xhttp . AmzMetaUnencryptedContentMD5 ) {
2020-11-19 11:50:22 -08:00
continue
}
meta [ k ] = v
}
2021-02-03 20:41:33 -08:00
2020-11-19 11:50:22 -08:00
if oi . ContentEncoding != "" {
meta [ xhttp . ContentEncoding ] = oi . ContentEncoding
}
2021-02-03 20:41:33 -08:00
2020-11-19 11:50:22 -08:00
if oi . ContentType != "" {
meta [ xhttp . ContentType ] = oi . ContentType
}
2021-02-03 20:41:33 -08:00
if oi . UserTags != "" {
meta [ xhttp . AmzObjectTagging ] = oi . UserTags
2020-11-19 11:50:22 -08:00
meta [ xhttp . AmzTagDirective ] = "REPLACE"
}
2021-02-03 20:41:33 -08:00
2020-11-19 11:50:22 -08:00
if sc == "" {
sc = oi . StorageClass
}
2021-04-19 10:30:42 -07:00
// drop non standard storage classes for tiering from replication
if sc != "" && ( sc == storageclass . RRS || sc == storageclass . STANDARD ) {
2021-02-03 20:41:33 -08:00
meta [ xhttp . AmzStorageClass ] = sc
2020-11-19 11:50:22 -08:00
}
2021-04-19 10:30:42 -07:00
2020-11-19 11:50:22 -08:00
meta [ xhttp . MinIOSourceETag ] = oi . ETag
2021-02-03 20:41:33 -08:00
meta [ xhttp . MinIOSourceMTime ] = oi . ModTime . Format ( time . RFC3339Nano )
2020-11-19 11:50:22 -08:00
meta [ xhttp . AmzBucketReplicationStatus ] = replication . Replica . String ( )
return meta
}
2021-02-08 18:12:28 -08:00
type caseInsensitiveMap map [ string ] string
// Lookup map entry case insensitively.
func ( m caseInsensitiveMap ) Lookup ( key string ) ( string , bool ) {
if len ( m ) == 0 {
return "" , false
}
for _ , k := range [ ] string {
key ,
strings . ToLower ( key ) ,
http . CanonicalHeaderKey ( key ) ,
} {
v , ok := m [ k ]
if ok {
return v , ok
}
2021-02-08 16:19:05 -08:00
}
2021-02-08 18:12:28 -08:00
return "" , false
2021-02-08 16:19:05 -08:00
}
2021-09-18 16:31:35 -04:00
func putReplicationOpts ( ctx context . Context , sc string , objInfo ObjectInfo ) ( putOpts miniogo . PutObjectOptions , err error ) {
2020-07-21 17:49:56 -07:00
meta := make ( map [ string ] string )
for k , v := range objInfo . UserDefined {
2021-01-11 22:36:51 -08:00
if strings . HasPrefix ( strings . ToLower ( k ) , ReservedMetadataPrefixLower ) {
2020-07-21 17:49:56 -07:00
continue
}
2021-01-11 22:36:51 -08:00
if isStandardHeader ( k ) {
2020-08-12 17:32:24 -07:00
continue
}
2020-07-21 17:49:56 -07:00
meta [ k ] = v
}
2021-02-03 20:41:33 -08:00
2021-04-19 10:30:42 -07:00
if sc == "" && ( objInfo . StorageClass == storageclass . STANDARD || objInfo . StorageClass == storageclass . RRS ) {
2020-08-05 20:01:20 -07:00
sc = objInfo . StorageClass
}
2020-07-21 17:49:56 -07:00
putOpts = miniogo . PutObjectOptions {
2020-10-06 08:37:09 -07:00
UserMetadata : meta ,
ContentType : objInfo . ContentType ,
ContentEncoding : objInfo . ContentEncoding ,
StorageClass : sc ,
Internal : miniogo . AdvancedPutOptions {
2021-03-03 11:13:31 -08:00
SourceVersionID : objInfo . VersionID ,
ReplicationStatus : miniogo . ReplicationStatusReplica ,
SourceMTime : objInfo . ModTime ,
SourceETag : objInfo . ETag ,
ReplicationRequest : true , // always set this to distinguish between `mc mirror` replication and serverside
2020-10-06 08:37:09 -07:00
} ,
2020-07-21 17:49:56 -07:00
}
2021-02-03 20:41:33 -08:00
if objInfo . UserTags != "" {
tag , _ := tags . ParseObjectTags ( objInfo . UserTags )
if tag != nil {
putOpts . UserTags = tag . ToMap ( )
2021-09-18 16:31:35 -04:00
// set tag timestamp in opts
tagTimestamp := objInfo . ModTime
if tagTmstampStr , ok := objInfo . UserDefined [ ReservedMetadataPrefixLower + TaggingTimestamp ] ; ok {
tagTimestamp , err = time . Parse ( time . RFC3339Nano , tagTmstampStr )
if err != nil {
return putOpts , err
}
}
putOpts . Internal . TaggingTimestamp = tagTimestamp
2021-02-03 20:41:33 -08:00
}
}
2021-02-08 18:12:28 -08:00
lkMap := caseInsensitiveMap ( objInfo . UserDefined )
if lang , ok := lkMap . Lookup ( xhttp . ContentLanguage ) ; ok {
2021-01-27 11:22:34 -08:00
putOpts . ContentLanguage = lang
}
2021-02-08 18:12:28 -08:00
if disp , ok := lkMap . Lookup ( xhttp . ContentDisposition ) ; ok {
2021-01-27 11:22:34 -08:00
putOpts . ContentDisposition = disp
}
2021-02-08 18:12:28 -08:00
if cc , ok := lkMap . Lookup ( xhttp . CacheControl ) ; ok {
2021-01-27 11:22:34 -08:00
putOpts . CacheControl = cc
}
2021-02-08 18:12:28 -08:00
if mode , ok := lkMap . Lookup ( xhttp . AmzObjectLockMode ) ; ok {
2020-07-21 17:49:56 -07:00
rmode := miniogo . RetentionMode ( mode )
putOpts . Mode = rmode
}
2021-02-08 18:12:28 -08:00
if retainDateStr , ok := lkMap . Lookup ( xhttp . AmzObjectLockRetainUntilDate ) ; ok {
rdate , err := time . Parse ( time . RFC3339 , retainDateStr )
2020-07-21 17:49:56 -07:00
if err != nil {
2021-02-08 18:12:28 -08:00
return putOpts , err
2020-07-21 17:49:56 -07:00
}
putOpts . RetainUntilDate = rdate
2021-09-18 16:31:35 -04:00
// set retention timestamp in opts
retTimestamp := objInfo . ModTime
if retainTmstampStr , ok := objInfo . UserDefined [ ReservedMetadataPrefixLower + ObjectLockRetentionTimestamp ] ; ok {
retTimestamp , err = time . Parse ( time . RFC3339Nano , retainTmstampStr )
if err != nil {
return putOpts , err
}
}
putOpts . Internal . RetentionTimestamp = retTimestamp
2020-07-21 17:49:56 -07:00
}
2021-02-08 18:12:28 -08:00
if lhold , ok := lkMap . Lookup ( xhttp . AmzObjectLockLegalHold ) ; ok {
2020-07-21 17:49:56 -07:00
putOpts . LegalHold = miniogo . LegalHoldStatus ( lhold )
2021-09-18 16:31:35 -04:00
// set legalhold timestamp in opts
lholdTimestamp := objInfo . ModTime
if lholdTmstampStr , ok := objInfo . UserDefined [ ReservedMetadataPrefixLower + ObjectLockLegalHoldTimestamp ] ; ok {
lholdTimestamp , err = time . Parse ( time . RFC3339Nano , lholdTmstampStr )
if err != nil {
return putOpts , err
}
}
putOpts . Internal . LegalholdTimestamp = lholdTimestamp
2020-07-21 17:49:56 -07:00
}
if crypto . S3 . IsEncrypted ( objInfo . UserDefined ) {
putOpts . ServerSideEncryption = encrypt . NewSSE ( )
}
return
}
2020-11-19 11:50:22 -08:00
type replicationAction string
const (
replicateMetadata replicationAction = "metadata"
replicateNone replicationAction = "none"
replicateAll replicationAction = "all"
)
2021-02-03 20:41:33 -08:00
// matches k1 with all keys, returns 'true' if one of them matches
func equals ( k1 string , keys ... string ) bool {
for _ , k2 := range keys {
2021-11-18 12:15:22 -08:00
if strings . EqualFold ( k1 , k2 ) {
2021-02-03 20:41:33 -08:00
return true
}
}
return false
}
2020-11-19 11:50:22 -08:00
// returns replicationAction by comparing metadata between source and target
2021-09-28 13:26:12 -04:00
func getReplicationAction ( oi1 ObjectInfo , oi2 minio . ObjectInfo , opType replication . Type ) replicationAction {
// Avoid resyncing null versions created prior to enabling replication if target has a newer copy
if opType == replication . ExistingObjectReplicationType &&
oi1 . ModTime . Unix ( ) > oi2 . LastModified . Unix ( ) && oi1 . VersionID == nullVersionID {
return replicateNone
}
2020-11-19 11:50:22 -08:00
// needs full replication
if oi1 . ETag != oi2 . ETag ||
oi1 . VersionID != oi2 . VersionID ||
oi1 . Size != oi2 . Size ||
2021-01-27 11:22:34 -08:00
oi1 . DeleteMarker != oi2 . IsDeleteMarker ||
2021-02-03 20:41:33 -08:00
oi1 . ModTime . Unix ( ) != oi2 . LastModified . Unix ( ) {
2020-11-19 11:50:22 -08:00
return replicateAll
}
2021-02-03 20:41:33 -08:00
2021-01-27 11:22:34 -08:00
if oi1 . ContentType != oi2 . ContentType {
2020-11-19 11:50:22 -08:00
return replicateMetadata
}
2021-02-03 20:41:33 -08:00
2020-11-19 11:50:22 -08:00
if oi1 . ContentEncoding != "" {
2021-01-27 11:22:34 -08:00
enc , ok := oi2 . Metadata [ xhttp . ContentEncoding ]
2021-02-03 20:41:33 -08:00
if ! ok {
enc , ok = oi2 . Metadata [ strings . ToLower ( xhttp . ContentEncoding ) ]
if ! ok {
return replicateMetadata
}
}
if strings . Join ( enc , "," ) != oi1 . ContentEncoding {
2020-11-19 11:50:22 -08:00
return replicateMetadata
}
}
2021-02-03 20:41:33 -08:00
t , _ := tags . ParseObjectTags ( oi1 . UserTags )
2022-01-10 19:06:10 -08:00
if ! reflect . DeepEqual ( oi2 . UserTags , t . ToMap ( ) ) || ( oi2 . UserTagCount != len ( t . ToMap ( ) ) ) {
2020-11-19 11:50:22 -08:00
return replicateMetadata
}
2021-02-03 20:41:33 -08:00
// Compare only necessary headers
compareKeys := [ ] string {
"Expires" ,
"Cache-Control" ,
"Content-Language" ,
"Content-Disposition" ,
"X-Amz-Object-Lock-Mode" ,
"X-Amz-Object-Lock-Retain-Until-Date" ,
"X-Amz-Object-Lock-Legal-Hold" ,
"X-Amz-Website-Redirect-Location" ,
"X-Amz-Meta-" ,
}
// compare metadata on both maps to see if meta is identical
compareMeta1 := make ( map [ string ] string )
for k , v := range oi1 . UserDefined {
var found bool
for _ , prefix := range compareKeys {
if ! strings . HasPrefix ( strings . ToLower ( k ) , strings . ToLower ( prefix ) ) {
continue
}
found = true
break
}
if found {
compareMeta1 [ strings . ToLower ( k ) ] = v
2021-01-27 11:22:34 -08:00
}
}
2021-02-03 20:41:33 -08:00
compareMeta2 := make ( map [ string ] string )
for k , v := range oi2 . Metadata {
var found bool
for _ , prefix := range compareKeys {
if ! strings . HasPrefix ( strings . ToLower ( k ) , strings . ToLower ( prefix ) ) {
continue
}
found = true
break
2021-01-27 11:22:34 -08:00
}
2021-02-03 20:41:33 -08:00
if found {
compareMeta2 [ strings . ToLower ( k ) ] = strings . Join ( v , "," )
2020-11-19 11:50:22 -08:00
}
}
2021-02-03 20:41:33 -08:00
if ! reflect . DeepEqual ( compareMeta1 , compareMeta2 ) {
2020-11-19 11:50:22 -08:00
return replicateMetadata
}
2021-02-03 20:41:33 -08:00
2020-11-19 11:50:22 -08:00
return replicateNone
}
2020-07-21 17:49:56 -07:00
// replicateObject replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
2022-07-12 10:43:32 -07:00
func replicateObject ( ctx context . Context , ri ReplicateObjectInfo , objectAPI ObjectLayer ) {
2021-07-01 14:02:44 -07:00
var replicationStatus replication . StatusType
defer func ( ) {
if replicationStatus . Empty ( ) {
// replication status is empty means
// replication was not attempted for some
// reason, notify the state of the object
// on disk.
replicationStatus = ri . ReplicationStatus
}
auditLogInternal ( ctx , ri . Bucket , ri . Name , AuditLogOptions {
2022-07-12 10:43:32 -07:00
Event : ri . EventType ,
2021-07-01 14:02:44 -07:00
APIName : ReplicateObjectAPI ,
VersionID : ri . VersionID ,
Status : replicationStatus . String ( ) ,
} )
} ( )
2021-04-15 16:32:00 -07:00
objInfo := ri . ObjectInfo
2020-09-16 16:04:55 -07:00
bucket := objInfo . Bucket
object := objInfo . Name
2020-07-30 19:55:22 -07:00
cfg , err := getReplicationConfig ( ctx , bucket )
2020-07-21 17:49:56 -07:00
if err != nil {
logger . LogIf ( ctx , err )
2021-02-03 20:41:33 -08:00
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
2020-07-21 17:49:56 -07:00
return
}
2021-09-18 16:31:35 -04:00
tgtArns := cfg . FilterTargetArns ( replication . ObjectOpts {
2022-01-19 10:45:42 -08:00
Name : object ,
SSEC : crypto . SSEC . IsEncrypted ( objInfo . UserDefined ) ,
UserTags : objInfo . UserTags ,
2021-09-18 16:31:35 -04:00
} )
// Lock the object name before starting replication.
// Use separate lock that doesn't collide with regular objects.
lk := objectAPI . NewNSLock ( bucket , "/[replicate]/" + object )
lkctx , err := lk . GetLock ( ctx , globalOperationTimeout )
if err != nil {
2021-02-03 20:41:33 -08:00
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
2022-08-22 16:53:06 -07:00
globalReplicationPool . queueMRFSave ( ri . ToMRFEntry ( ) )
2021-09-18 16:31:35 -04:00
logger . LogIf ( ctx , fmt . Errorf ( "failed to get lock for object: %s bucket:%s arn:%s" , object , bucket , cfg . RoleArn ) )
2020-07-21 17:49:56 -07:00
return
}
2021-09-18 16:31:35 -04:00
ctx = lkctx . Context ( )
defer lk . Unlock ( lkctx . Cancel )
var wg sync . WaitGroup
var rinfos replicatedInfos
rinfos . Targets = make ( [ ] replicatedTargetInfo , len ( tgtArns ) )
for i , tgtArn := range tgtArns {
tgt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , tgtArn )
if tgt == nil {
logger . LogIf ( ctx , fmt . Errorf ( "failed to get target for bucket:%s arn:%s" , bucket , tgtArn ) )
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
continue
}
wg . Add ( 1 )
go func ( index int , tgt * TargetClient ) {
defer wg . Done ( )
rinfos . Targets [ index ] = replicateObjectToTarget ( ctx , ri , objectAPI , tgt )
} ( i , tgt )
}
wg . Wait ( )
// FIXME: add support for missing replication events
// - event.ObjectReplicationMissedThreshold
// - event.ObjectReplicationReplicatedAfterThreshold
2022-01-02 09:15:06 -08:00
eventName := event . ObjectReplicationComplete
2021-09-18 16:31:35 -04:00
if rinfos . ReplicationStatus ( ) == replication . Failed {
eventName = event . ObjectReplicationFailed
}
newReplStatusInternal := rinfos . ReplicationStatusInternal ( )
// Note that internal replication status(es) may match for previously replicated objects - in such cases
// metadata should be updated with last resync timestamp.
if objInfo . ReplicationStatusInternal != newReplStatusInternal || rinfos . ReplicationResynced ( ) {
popts := ObjectOptions {
2021-10-30 08:22:04 -07:00
MTime : objInfo . ModTime ,
VersionID : objInfo . VersionID ,
EvalMetadataFn : func ( oi ObjectInfo ) error {
oi . UserDefined [ ReservedMetadataPrefixLower + ReplicationStatus ] = newReplStatusInternal
oi . UserDefined [ ReservedMetadataPrefixLower + ReplicationTimestamp ] = UTCNow ( ) . Format ( time . RFC3339Nano )
oi . UserDefined [ xhttp . AmzBucketReplicationStatus ] = string ( rinfos . ReplicationStatus ( ) )
for _ , rinfo := range rinfos . Targets {
if rinfo . ResyncTimestamp != "" {
oi . UserDefined [ targetResetHeader ( rinfo . Arn ) ] = rinfo . ResyncTimestamp
}
}
if objInfo . UserTags != "" {
oi . UserDefined [ xhttp . AmzObjectTagging ] = objInfo . UserTags
}
return nil
} ,
2021-09-18 16:31:35 -04:00
}
2021-10-30 08:22:04 -07:00
2021-09-18 16:31:35 -04:00
if _ , err = objectAPI . PutObjectMetadata ( ctx , bucket , object , popts ) ; err != nil {
logger . LogIf ( ctx , fmt . Errorf ( "Unable to update replication metadata for %s/%s(%s): %w" ,
bucket , objInfo . Name , objInfo . VersionID , err ) )
}
opType := replication . MetadataReplicationType
if rinfos . Action ( ) == replicateAll {
opType = replication . ObjectReplicationType
}
for _ , rinfo := range rinfos . Targets {
if rinfo . ReplicationStatus != rinfo . PrevReplicationStatus {
2021-11-17 21:10:57 +01:00
globalReplicationStats . Update ( bucket , rinfo . Arn , rinfo . Size , rinfo . Duration , rinfo . ReplicationStatus , rinfo . PrevReplicationStatus , opType )
2021-09-18 16:31:35 -04:00
}
}
}
sendEvent ( eventArgs {
EventName : eventName ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
// re-queue failures once more - keep a retry count to avoid flooding the queue if
// the target site is down. Leave it to scanner to catch up instead.
2022-08-22 16:53:06 -07:00
if rinfos . ReplicationStatus ( ) != replication . Completed {
2021-09-18 16:31:35 -04:00
ri . OpType = replication . HealReplicationType
2022-07-12 10:43:32 -07:00
ri . EventType = ReplicateMRF
2021-09-18 16:31:35 -04:00
ri . ReplicationStatusInternal = rinfos . ReplicationStatusInternal ( )
ri . RetryCount ++
2022-08-22 16:53:06 -07:00
globalReplicationPool . queueMRFSave ( ri . ToMRFEntry ( ) )
2021-09-18 16:31:35 -04:00
}
}
// replicateObjectToTarget replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObjectToTarget ( ctx context . Context , ri ReplicateObjectInfo , objectAPI ObjectLayer , tgt * TargetClient ) ( rinfo replicatedTargetInfo ) {
2021-11-17 21:10:57 +01:00
startTime := time . Now ( )
2021-09-22 13:48:45 -04:00
objInfo := ri . ObjectInfo . Clone ( )
2021-09-18 16:31:35 -04:00
bucket := objInfo . Bucket
object := objInfo . Name
var (
closeOnDefer bool
gr * GetObjectReader
size int64
err error
)
sz , _ := objInfo . GetActualSize ( )
// set defaults for replication action based on operation being performed - actual
// replication action can only be determined after stat on remote. This default is
// needed for updating replication metrics correctly when target is offline.
var rAction replicationAction
switch ri . OpType {
case replication . MetadataReplicationType :
rAction = replicateMetadata
default :
rAction = replicateAll
}
rinfo = replicatedTargetInfo {
Size : sz ,
Arn : tgt . ARN ,
PrevReplicationStatus : objInfo . TargetReplicationStatus ( tgt . ARN ) ,
ReplicationStatus : replication . Failed ,
OpType : ri . OpType ,
ReplicationAction : rAction ,
}
2022-02-10 10:16:52 -08:00
2021-09-18 16:31:35 -04:00
if ri . ObjectInfo . TargetReplicationStatus ( tgt . ARN ) == replication . Completed && ! ri . ExistingObjResync . Empty ( ) && ! ri . ExistingObjResync . mustResyncTarget ( tgt . ARN ) {
rinfo . ReplicationStatus = replication . Completed
rinfo . ReplicationResynced = true
2021-09-08 18:34:50 -04:00
return
}
2022-08-16 17:46:22 -07:00
if globalBucketTargetSys . isOffline ( tgt . EndpointURL ( ) ) {
2021-09-18 16:31:35 -04:00
logger . LogIf ( ctx , fmt . Errorf ( "remote target is offline for bucket:%s arn:%s" , bucket , tgt . ARN ) )
2021-08-23 17:16:18 +02:00
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
return
}
2022-06-06 15:14:56 -07:00
versioned := globalBucketVersioningSys . PrefixEnabled ( bucket , object )
versionSuspended := globalBucketVersioningSys . PrefixSuspended ( bucket , object )
2021-09-18 16:31:35 -04:00
gr , err = objectAPI . GetObjectNInfo ( ctx , bucket , object , nil , http . Header { } , readLock , ObjectOptions {
2022-06-06 15:14:56 -07:00
VersionID : objInfo . VersionID ,
Versioned : versioned ,
VersionSuspended : versionSuspended ,
2020-09-15 20:44:48 -07:00
} )
2020-07-21 17:49:56 -07:00
if err != nil {
2022-08-19 16:21:05 -07:00
if ! isErrObjectNotFound ( err ) {
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
logger . LogIf ( ctx , fmt . Errorf ( "Unable to update replicate metadata for %s/%s(%s): %w" , bucket , object , objInfo . VersionID , err ) )
}
2020-07-21 17:49:56 -07:00
return
}
2021-06-28 23:58:08 -07:00
defer func ( ) {
if closeOnDefer {
gr . Close ( )
}
} ( )
closeOnDefer = true
2021-02-03 20:41:33 -08:00
2020-09-16 16:04:55 -07:00
objInfo = gr . ObjInfo
2021-09-18 16:31:35 -04:00
size , err = objInfo . GetActualSize ( )
2020-07-21 17:49:56 -07:00
if err != nil {
logger . LogIf ( ctx , err )
2021-02-03 20:41:33 -08:00
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
2020-07-21 17:49:56 -07:00
return
}
2021-09-18 16:31:35 -04:00
if tgt . Bucket == "" {
2021-02-03 20:41:33 -08:00
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate object %s(%s), bucket is empty" , objInfo . Name , objInfo . VersionID ) )
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
2021-09-18 16:31:35 -04:00
return rinfo
2020-07-21 17:49:56 -07:00
}
2022-02-10 10:16:52 -08:00
defer func ( ) {
if rinfo . ReplicationStatus == replication . Completed && ri . OpType == replication . ExistingObjectReplicationType && tgt . ResetID != "" {
rinfo . ResyncTimestamp = fmt . Sprintf ( "%s;%s" , UTCNow ( ) . Format ( http . TimeFormat ) , tgt . ResetID )
rinfo . ReplicationResynced = true
}
rinfo . Duration = time . Since ( startTime )
} ( )
2020-09-16 16:04:55 -07:00
2021-09-18 16:31:35 -04:00
rAction = replicateAll
oi , cerr := tgt . StatObject ( ctx , tgt . Bucket , object , miniogo . StatObjectOptions {
2021-01-27 11:22:34 -08:00
VersionID : objInfo . VersionID ,
Internal : miniogo . AdvancedGetOptions {
ReplicationProxyRequest : "false" ,
2022-01-02 09:15:06 -08:00
} ,
} )
2021-09-18 16:31:35 -04:00
if cerr == nil {
2021-09-28 13:26:12 -04:00
rAction = getReplicationAction ( objInfo , oi , ri . OpType )
2021-09-18 16:31:35 -04:00
rinfo . ReplicationStatus = replication . Completed
if rAction == replicateNone {
2021-09-28 13:26:12 -04:00
if ri . OpType == replication . ExistingObjectReplicationType &&
objInfo . ModTime . Unix ( ) > oi . LastModified . Unix ( ) && objInfo . VersionID == nullVersionID {
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate %s/%s (null). Newer version exists on target" , bucket , object ) )
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
}
2020-07-21 17:49:56 -07:00
// object with same VersionID already exists, replication kicked off by
2021-04-03 09:03:42 -07:00
// PutObject might have completed
2021-09-18 16:31:35 -04:00
if objInfo . TargetReplicationStatus ( tgt . ARN ) == replication . Pending || objInfo . TargetReplicationStatus ( tgt . ARN ) == replication . Failed || ri . OpType == replication . ExistingObjectReplicationType {
2021-07-01 14:02:44 -07:00
// if metadata is not updated for some reason after replication, such as
// 503 encountered while updating metadata - make sure to set ReplicationStatus
// as Completed.
//
// Note: Replication Stats would have been updated despite metadata update failure.
2021-06-28 23:58:08 -07:00
gr . Close ( )
closeOnDefer = false
2022-02-10 10:16:52 -08:00
rinfo . ReplicationAction = rAction
rinfo . ReplicationStatus = replication . Completed
2021-04-29 16:46:26 -07:00
}
2020-07-21 17:49:56 -07:00
return
}
}
2021-09-18 16:31:35 -04:00
rinfo . ReplicationStatus = replication . Completed
rinfo . Size = size
rinfo . ReplicationAction = rAction
2021-02-20 00:22:17 -08:00
// use core client to avoid doing multipart on PUT
c := & miniogo . Core { Client : tgt . Client }
2021-09-18 16:31:35 -04:00
if rAction != replicateAll {
2020-11-19 11:50:22 -08:00
// replicate metadata for object tagging/copy with metadata replacement
2021-02-10 17:25:04 -08:00
srcOpts := miniogo . CopySrcOptions {
2021-09-18 16:31:35 -04:00
Bucket : tgt . Bucket ,
2021-02-10 17:25:04 -08:00
Object : object ,
2021-04-03 09:03:42 -07:00
VersionID : objInfo . VersionID ,
}
2021-03-03 11:13:31 -08:00
dstOpts := miniogo . PutObjectOptions {
Internal : miniogo . AdvancedPutOptions {
SourceVersionID : objInfo . VersionID ,
ReplicationRequest : true , // always set this to distinguish between `mc mirror` replication and serverside
2022-01-02 09:15:06 -08:00
} ,
}
2021-09-18 16:31:35 -04:00
if _ , err = c . CopyObject ( ctx , tgt . Bucket , object , tgt . Bucket , object , getCopyObjMetadata ( objInfo , tgt . StorageClass ) , srcOpts , dstOpts ) ; err != nil {
rinfo . ReplicationStatus = replication . Failed
2021-02-09 15:11:43 -08:00
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate metadata for object %s/%s(%s): %s" , bucket , objInfo . Name , objInfo . VersionID , err ) )
2021-01-06 16:13:10 -08:00
}
} else {
2021-09-18 16:31:35 -04:00
var putOpts minio . PutObjectOptions
putOpts , err = putReplicationOpts ( ctx , tgt . StorageClass , objInfo )
2021-02-08 16:19:05 -08:00
if err != nil {
2021-09-18 16:31:35 -04:00
logger . LogIf ( ctx , fmt . Errorf ( "failed to get target for replication bucket:%s err:%w" , bucket , err ) )
2021-02-08 18:12:28 -08:00
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
2021-02-08 16:19:05 -08:00
return
}
2021-01-06 16:13:10 -08:00
var headerSize int
for k , v := range putOpts . Header ( ) {
headerSize += len ( k ) + len ( v )
}
2021-01-08 10:12:26 -08:00
2021-04-05 16:07:53 -07:00
opts := & bandwidth . MonitorReaderOptions {
2021-06-24 18:29:30 -07:00
Bucket : objInfo . Bucket ,
HeaderSize : headerSize ,
2021-04-05 16:07:53 -07:00
}
2021-07-28 15:20:01 -07:00
newCtx := ctx
if globalBucketMonitor . IsThrottled ( bucket ) {
var cancel context . CancelFunc
newCtx , cancel = context . WithTimeout ( ctx , throttleDeadline )
defer cancel ( )
}
2021-06-24 18:29:30 -07:00
r := bandwidth . NewMonitoredReader ( newCtx , globalBucketMonitor , gr , opts )
2021-09-08 22:25:23 -07:00
if objInfo . isMultipart ( ) {
2021-09-18 16:31:35 -04:00
if err := replicateObjectWithMultipart ( ctx , c , tgt . Bucket , object ,
2021-07-28 22:11:55 -07:00
r , objInfo , putOpts ) ; err != nil {
2021-09-18 16:31:35 -04:00
rinfo . ReplicationStatus = replication . Failed
2021-06-30 07:44:24 -07:00
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate for object %s/%s(%s): %s" , bucket , objInfo . Name , objInfo . VersionID , err ) )
}
} else {
2021-09-18 16:31:35 -04:00
if _ , err = c . PutObject ( ctx , tgt . Bucket , object , r , size , "" , "" , putOpts ) ; err != nil {
rinfo . ReplicationStatus = replication . Failed
2021-06-30 07:44:24 -07:00
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate for object %s/%s(%s): %s" , bucket , objInfo . Name , objInfo . VersionID , err ) )
}
2021-01-06 16:13:10 -08:00
}
2020-07-21 17:49:56 -07:00
}
2021-06-28 23:58:08 -07:00
gr . Close ( )
closeOnDefer = false
2021-09-18 16:31:35 -04:00
return
2020-07-21 17:49:56 -07:00
}
2020-08-12 17:32:24 -07:00
2021-07-28 22:11:55 -07:00
func replicateObjectWithMultipart ( ctx context . Context , c * miniogo . Core , bucket , object string , r io . Reader , objInfo ObjectInfo , opts miniogo . PutObjectOptions ) ( err error ) {
2021-06-30 07:44:24 -07:00
var uploadedParts [ ] miniogo . CompletePart
2021-07-28 22:11:55 -07:00
uploadID , err := c . NewMultipartUpload ( context . Background ( ) , bucket , object , opts )
2021-06-30 07:44:24 -07:00
if err != nil {
2021-07-28 22:11:55 -07:00
return err
2021-06-30 07:44:24 -07:00
}
2021-07-28 22:11:55 -07:00
defer func ( ) {
if err != nil {
// block and abort remote upload upon failure.
2022-01-11 22:32:29 -08:00
attempts := 1
for attempts <= 3 {
aerr := c . AbortMultipartUpload ( ctx , bucket , object , uploadID )
if aerr == nil {
return
}
logger . LogIf ( ctx ,
fmt . Errorf ( "Trying %s: Unable to cleanup failed multipart replication %s on remote %s/%s: %w - this may consume space on remote cluster" ,
humanize . Ordinal ( attempts ) , uploadID , bucket , object , aerr ) )
attempts ++
time . Sleep ( time . Second )
2021-07-28 22:11:55 -07:00
}
}
} ( )
2021-06-30 07:44:24 -07:00
var (
hr * hash . Reader
pInfo miniogo . ObjectPart
)
2021-07-28 22:11:55 -07:00
2021-06-30 07:44:24 -07:00
for _ , partInfo := range objInfo . Parts {
2021-08-24 17:41:05 -04:00
hr , err = hash . NewReader ( r , partInfo . ActualSize , "" , "" , partInfo . ActualSize )
2021-06-30 07:44:24 -07:00
if err != nil {
2021-07-28 22:11:55 -07:00
return err
2021-06-30 07:44:24 -07:00
}
2021-08-24 17:41:05 -04:00
pInfo , err = c . PutObjectPart ( ctx , bucket , object , uploadID , partInfo . Number , hr , partInfo . ActualSize , "" , "" , opts . ServerSideEncryption )
2021-06-30 07:44:24 -07:00
if err != nil {
2021-07-28 22:11:55 -07:00
return err
2021-06-30 07:44:24 -07:00
}
2021-08-24 17:41:05 -04:00
if pInfo . Size != partInfo . ActualSize {
return fmt . Errorf ( "Part size mismatch: got %d, want %d" , pInfo . Size , partInfo . ActualSize )
2021-06-30 07:44:24 -07:00
}
uploadedParts = append ( uploadedParts , miniogo . CompletePart {
PartNumber : pInfo . PartNumber ,
ETag : pInfo . ETag ,
} )
}
2021-07-28 22:11:55 -07:00
_ , err = c . CompleteMultipartUpload ( ctx , bucket , object , uploadID , uploadedParts , miniogo . PutObjectOptions {
Internal : miniogo . AdvancedPutOptions {
SourceMTime : objInfo . ModTime ,
// always set this to distinguish between `mc mirror` replication and serverside
ReplicationRequest : true ,
2022-01-02 09:15:06 -08:00
} ,
} )
2021-07-28 22:11:55 -07:00
return err
2021-06-30 07:44:24 -07:00
}
2020-08-12 17:32:24 -07:00
// filterReplicationStatusMetadata filters replication status metadata for COPY
func filterReplicationStatusMetadata ( metadata map [ string ] string ) map [ string ] string {
// Copy on write
dst := metadata
var copied bool
delKey := func ( key string ) {
if _ , ok := metadata [ key ] ; ! ok {
return
}
if ! copied {
dst = make ( map [ string ] string , len ( metadata ) )
for k , v := range metadata {
dst [ k ] = v
}
copied = true
}
delete ( dst , key )
}
delKey ( xhttp . AmzBucketReplicationStatus )
return dst
}
2020-09-16 16:04:55 -07:00
2021-06-01 19:59:11 -07:00
// DeletedObjectReplicationInfo has info on deleted object
type DeletedObjectReplicationInfo struct {
2020-11-19 18:43:58 -08:00
DeletedObject
2021-09-18 16:31:35 -04:00
Bucket string
2022-07-12 10:43:32 -07:00
EventType string
2021-09-18 16:31:35 -04:00
OpType replication . Type
ResetID string
TargetArn string
2020-11-19 18:43:58 -08:00
}
2022-08-22 16:53:06 -07:00
// ToMRFEntry returns the relevant info needed by MRF
func ( di DeletedObjectReplicationInfo ) ToMRFEntry ( ) MRFReplicateEntry {
versionID := di . DeleteMarkerVersionID
if versionID == "" {
versionID = di . VersionID
}
return MRFReplicateEntry {
Bucket : di . Bucket ,
Object : di . ObjectName ,
versionID : versionID ,
}
}
2021-07-01 14:02:44 -07:00
// Replication specific APIName
const (
ReplicateObjectAPI = "ReplicateObject"
ReplicateDeleteAPI = "ReplicateDelete"
)
2021-06-28 23:58:08 -07:00
const (
2021-07-01 14:02:44 -07:00
// ReplicateQueued - replication being queued trail
ReplicateQueued = "replicate:queue"
// ReplicateExisting - audit trail for existing objects replication
ReplicateExisting = "replicate:existing"
// ReplicateExistingDelete - audit trail for delete replication triggered for existing delete markers
ReplicateExistingDelete = "replicate:existing:delete"
// ReplicateMRF - audit trail for replication from Most Recent Failures (MRF) queue
ReplicateMRF = "replicate:mrf"
2022-07-12 10:43:32 -07:00
// ReplicateIncoming - audit trail of inline replication
2021-07-01 14:02:44 -07:00
ReplicateIncoming = "replicate:incoming"
2022-07-12 10:43:32 -07:00
// ReplicateIncomingDelete - audit trail of inline replication of deletes.
ReplicateIncomingDelete = "replicate:incoming:delete"
2021-07-01 14:02:44 -07:00
// ReplicateHeal - audit trail for healing of failed/pending replications
ReplicateHeal = "replicate:heal"
2022-07-12 10:43:32 -07:00
// ReplicateHealDelete - audit trail of healing of failed/pending delete replications.
ReplicateHealDelete = "replicate:heal:delete"
2021-06-28 23:58:08 -07:00
)
2020-09-21 13:43:29 -07:00
var (
2021-04-03 09:03:42 -07:00
globalReplicationPool * ReplicationPool
globalReplicationStats * ReplicationStats
2020-09-21 13:43:29 -07:00
)
2020-09-16 16:04:55 -07:00
2021-03-09 02:56:42 -08:00
// ReplicationPool describes replication pool
type ReplicationPool struct {
2021-06-01 19:59:11 -07:00
objLayer ObjectLayer
ctx context . Context
mrfWorkerKillCh chan struct { }
workerKillCh chan struct { }
replicaCh chan ReplicateObjectInfo
replicaDeleteCh chan DeletedObjectReplicationInfo
mrfReplicaCh chan ReplicateObjectInfo
existingReplicaCh chan ReplicateObjectInfo
existingReplicaDeleteCh chan DeletedObjectReplicationInfo
2022-08-22 16:53:06 -07:00
mrfSaveCh chan MRFReplicateEntry
workerSize int
mrfWorkerSize int
resyncState replicationResyncState
workerWg sync . WaitGroup
mrfWorkerWg sync . WaitGroup
once sync . Once
mu sync . Mutex
mrfMutex sync . Mutex
2021-03-09 02:56:42 -08:00
}
// NewReplicationPool creates a pool of replication workers of specified size
2021-04-23 21:58:45 -07:00
func NewReplicationPool ( ctx context . Context , o ObjectLayer , opts replicationPoolOpts ) * ReplicationPool {
2021-03-09 02:56:42 -08:00
pool := & ReplicationPool {
2021-06-01 19:59:11 -07:00
replicaCh : make ( chan ReplicateObjectInfo , 100000 ) ,
replicaDeleteCh : make ( chan DeletedObjectReplicationInfo , 100000 ) ,
mrfReplicaCh : make ( chan ReplicateObjectInfo , 100000 ) ,
workerKillCh : make ( chan struct { } , opts . Workers ) ,
mrfWorkerKillCh : make ( chan struct { } , opts . FailedWorkers ) ,
existingReplicaCh : make ( chan ReplicateObjectInfo , 100000 ) ,
existingReplicaDeleteCh : make ( chan DeletedObjectReplicationInfo , 100000 ) ,
2022-02-10 10:16:52 -08:00
resyncState : replicationResyncState { statusMap : make ( map [ string ] BucketReplicationResyncStatus ) } ,
2022-08-22 16:53:06 -07:00
mrfSaveCh : make ( chan MRFReplicateEntry , 100000 ) ,
2021-06-01 19:59:11 -07:00
ctx : ctx ,
objLayer : o ,
2021-04-03 09:03:42 -07:00
}
2021-05-28 13:28:37 -07:00
2021-04-23 21:58:45 -07:00
pool . ResizeWorkers ( opts . Workers )
pool . ResizeFailedWorkers ( opts . FailedWorkers )
2021-06-01 19:59:11 -07:00
go pool . AddExistingObjectReplicateWorker ( )
2022-07-12 10:43:32 -07:00
go pool . updateResyncStatus ( ctx , o )
2022-08-22 16:53:06 -07:00
go pool . processMRF ( )
go pool . persistMRF ( )
2021-03-09 02:56:42 -08:00
return pool
2020-09-16 16:04:55 -07:00
}
2021-04-03 09:03:42 -07:00
// AddMRFWorker adds a pending/failed replication worker to handle requests that could not be queued
// to the other workers
func ( p * ReplicationPool ) AddMRFWorker ( ) {
2022-07-12 10:43:32 -07:00
defer p . mrfWorkerWg . Done ( )
2021-04-03 09:03:42 -07:00
for {
select {
case <- p . ctx . Done ( ) :
return
case oi , ok := <- p . mrfReplicaCh :
if ! ok {
return
}
2022-07-12 10:43:32 -07:00
replicateObject ( p . ctx , oi , p . objLayer )
2021-05-28 13:28:37 -07:00
case <- p . mrfWorkerKillCh :
return
2021-04-03 09:03:42 -07:00
}
}
}
2021-03-09 02:56:42 -08:00
// AddWorker adds a replication worker to the pool
func ( p * ReplicationPool ) AddWorker ( ) {
2021-04-23 21:58:45 -07:00
defer p . workerWg . Done ( )
2021-03-09 02:56:42 -08:00
for {
select {
case <- p . ctx . Done ( ) :
return
case oi , ok := <- p . replicaCh :
if ! ok {
2020-09-16 16:04:55 -07:00
return
}
2022-07-12 10:43:32 -07:00
replicateObject ( p . ctx , oi , p . objLayer )
2021-03-09 02:56:42 -08:00
case doi , ok := <- p . replicaDeleteCh :
if ! ok {
return
}
2022-07-12 10:43:32 -07:00
replicateDelete ( p . ctx , doi , p . objLayer )
2021-04-23 21:58:45 -07:00
case <- p . workerKillCh :
2021-03-09 02:56:42 -08:00
return
2020-09-16 16:04:55 -07:00
}
2021-03-09 02:56:42 -08:00
}
2020-09-16 16:04:55 -07:00
}
2020-09-21 13:43:29 -07:00
2021-06-01 19:59:11 -07:00
// AddExistingObjectReplicateWorker adds a worker to queue existing objects that need to be sync'd
func ( p * ReplicationPool ) AddExistingObjectReplicateWorker ( ) {
for {
select {
case <- p . ctx . Done ( ) :
return
case oi , ok := <- p . existingReplicaCh :
if ! ok {
return
}
2022-07-12 10:43:32 -07:00
replicateObject ( p . ctx , oi , p . objLayer )
2021-06-01 19:59:11 -07:00
case doi , ok := <- p . existingReplicaDeleteCh :
if ! ok {
return
}
2022-07-12 10:43:32 -07:00
replicateDelete ( p . ctx , doi , p . objLayer )
2021-06-01 19:59:11 -07:00
}
}
}
2021-04-23 21:58:45 -07:00
// ResizeWorkers sets replication workers pool to new size
func ( p * ReplicationPool ) ResizeWorkers ( n int ) {
2021-03-09 02:56:42 -08:00
p . mu . Lock ( )
defer p . mu . Unlock ( )
2021-04-23 21:58:45 -07:00
for p . workerSize < n {
p . workerSize ++
p . workerWg . Add ( 1 )
2021-03-09 02:56:42 -08:00
go p . AddWorker ( )
}
2021-04-23 21:58:45 -07:00
for p . workerSize > n {
p . workerSize --
go func ( ) { p . workerKillCh <- struct { } { } } ( )
}
}
// ResizeFailedWorkers sets replication failed workers pool size
func ( p * ReplicationPool ) ResizeFailedWorkers ( n int ) {
p . mu . Lock ( )
defer p . mu . Unlock ( )
for p . mrfWorkerSize < n {
p . mrfWorkerSize ++
p . mrfWorkerWg . Add ( 1 )
go p . AddMRFWorker ( )
}
for p . mrfWorkerSize > n {
p . mrfWorkerSize --
go func ( ) { p . mrfWorkerKillCh <- struct { } { } } ( )
2021-03-09 02:56:42 -08:00
}
}
2021-12-13 18:22:56 -08:00
// suggestedWorkers recommends an increase in number of workers to meet replication load.
func ( p * ReplicationPool ) suggestedWorkers ( failQueue bool ) int {
if failQueue {
return int ( float64 ( p . mrfWorkerSize ) * ReplicationWorkerMultiplier )
}
return int ( float64 ( p . workerSize ) * ReplicationWorkerMultiplier )
}
2021-04-29 18:20:39 -07:00
func ( p * ReplicationPool ) queueReplicaTask ( ri ReplicateObjectInfo ) {
if p == nil {
return
}
2022-08-22 16:53:06 -07:00
var ch , healCh chan ReplicateObjectInfo
2021-06-01 19:59:11 -07:00
switch ri . OpType {
case replication . ExistingObjectReplicationType :
ch = p . existingReplicaCh
2021-06-28 23:58:08 -07:00
case replication . HealReplicationType :
2022-08-22 16:53:06 -07:00
ch = p . mrfReplicaCh
healCh = p . replicaCh
2021-06-01 19:59:11 -07:00
default :
ch = p . replicaCh
}
2021-04-29 18:20:39 -07:00
select {
case <- GlobalContext . Done ( ) :
p . once . Do ( func ( ) {
close ( p . replicaCh )
close ( p . mrfReplicaCh )
2021-06-01 19:59:11 -07:00
close ( p . existingReplicaCh )
2021-04-29 18:20:39 -07:00
} )
2022-08-22 16:53:06 -07:00
case healCh <- ri :
2021-06-01 19:59:11 -07:00
case ch <- ri :
2021-04-29 18:20:39 -07:00
default :
2022-07-27 09:44:59 -07:00
logger . LogOnceIf ( GlobalContext , fmt . Errorf ( "WARNING: Unable to keep up with incoming traffic - we recommend increasing number of replicate object workers with `mc admin config set api replication_workers=%d`" , p . suggestedWorkers ( false ) ) , string ( replicationSubsystem ) )
2021-04-29 18:20:39 -07:00
}
}
2021-09-18 16:31:35 -04:00
func queueReplicateDeletesWrapper ( doi DeletedObjectReplicationInfo , existingObjectResync ResyncDecision ) {
for k , v := range existingObjectResync . targets {
if v . Replicate {
doi . ResetID = v . ResetID
doi . TargetArn = k
globalReplicationPool . queueReplicaDeleteTask ( doi )
}
}
}
2021-06-01 19:59:11 -07:00
func ( p * ReplicationPool ) queueReplicaDeleteTask ( doi DeletedObjectReplicationInfo ) {
2021-03-09 02:56:42 -08:00
if p == nil {
return
2020-09-21 13:43:29 -07:00
}
2021-06-01 19:59:11 -07:00
var ch chan DeletedObjectReplicationInfo
switch doi . OpType {
case replication . ExistingObjectReplicationType :
ch = p . existingReplicaDeleteCh
2021-06-28 23:58:08 -07:00
case replication . HealReplicationType :
2021-07-01 14:02:44 -07:00
fallthrough
2021-06-01 19:59:11 -07:00
default :
ch = p . replicaDeleteCh
}
2021-03-09 02:56:42 -08:00
select {
2021-04-29 18:20:39 -07:00
case <- GlobalContext . Done ( ) :
2021-04-16 14:09:25 -07:00
p . once . Do ( func ( ) {
close ( p . replicaDeleteCh )
2021-06-01 19:59:11 -07:00
close ( p . existingReplicaDeleteCh )
2021-04-16 14:09:25 -07:00
} )
2021-06-01 19:59:11 -07:00
case ch <- doi :
2021-03-09 02:56:42 -08:00
default :
2022-07-27 09:44:59 -07:00
logger . LogOnceIf ( GlobalContext , fmt . Errorf ( "WARNING: Unable to keep up with incoming deletes - we recommend increasing number of replicate workers with `mc admin config set api replication_workers=%d`" , p . suggestedWorkers ( false ) ) , string ( replicationSubsystem ) )
2021-03-09 02:56:42 -08:00
}
}
2021-04-23 21:58:45 -07:00
type replicationPoolOpts struct {
Workers int
FailedWorkers int
}
2021-03-09 02:56:42 -08:00
func initBackgroundReplication ( ctx context . Context , objectAPI ObjectLayer ) {
2021-04-23 21:58:45 -07:00
globalReplicationPool = NewReplicationPool ( ctx , objectAPI , replicationPoolOpts {
Workers : globalAPIConfig . getReplicationWorkers ( ) ,
FailedWorkers : globalAPIConfig . getReplicationFailedWorkers ( ) ,
} )
2021-04-03 09:03:42 -07:00
globalReplicationStats = NewReplicationStats ( ctx , objectAPI )
2021-10-21 21:52:55 -04:00
go globalReplicationStats . loadInitialReplicationMetrics ( ctx )
2020-09-21 13:43:29 -07:00
}
2021-01-11 22:36:51 -08:00
2022-03-08 13:58:55 -08:00
type proxyResult struct {
Proxy bool
Err error
}
2021-01-11 22:36:51 -08:00
// get Reader from replication target if active-active replication is in place and
// this node returns a 404
2022-03-08 13:58:55 -08:00
func proxyGetToReplicationTarget ( ctx context . Context , bucket , object string , rs * HTTPRangeSpec , h http . Header , opts ObjectOptions , proxyTargets * madmin . BucketTargets ) ( gr * GetObjectReader , proxy proxyResult , err error ) {
tgt , oi , proxy := proxyHeadToRepTarget ( ctx , bucket , object , rs , opts , proxyTargets )
if ! proxy . Proxy {
return nil , proxy , nil
2021-01-11 22:36:51 -08:00
}
2022-03-08 13:58:55 -08:00
fn , _ , _ , err := NewGetObjectReader ( nil , oi , opts )
2021-01-11 22:36:51 -08:00
if err != nil {
2022-03-08 13:58:55 -08:00
return nil , proxy , err
2021-01-11 22:36:51 -08:00
}
gopts := miniogo . GetObjectOptions {
VersionID : opts . VersionID ,
ServerSideEncryption : opts . ServerSideEncryption ,
Internal : miniogo . AdvancedGetOptions {
2021-01-27 11:22:34 -08:00
ReplicationProxyRequest : "true" ,
2021-01-11 22:36:51 -08:00
} ,
2022-03-08 13:58:55 -08:00
PartNumber : opts . PartNumber ,
2021-01-11 22:36:51 -08:00
}
// get correct offsets for encrypted object
2022-03-08 13:58:55 -08:00
if rs != nil {
h , err := rs . ToHeader ( )
if err != nil {
return nil , proxy , err
2021-01-11 22:36:51 -08:00
}
2022-03-08 13:58:55 -08:00
gopts . Set ( xhttp . Range , h )
2021-01-11 22:36:51 -08:00
}
2021-02-03 20:41:33 -08:00
// Make sure to match ETag when proxying.
if err = gopts . SetMatchETag ( oi . ETag ) ; err != nil {
2022-03-08 13:58:55 -08:00
return nil , proxy , err
2021-02-03 20:41:33 -08:00
}
2021-01-11 22:36:51 -08:00
c := miniogo . Core { Client : tgt . Client }
2022-03-08 13:58:55 -08:00
obj , _ , h , err := c . GetObject ( ctx , bucket , object , gopts )
2021-01-11 22:36:51 -08:00
if err != nil {
2022-03-08 13:58:55 -08:00
return nil , proxy , err
2021-01-11 22:36:51 -08:00
}
closeReader := func ( ) { obj . Close ( ) }
2021-06-24 09:44:00 -07:00
reader , err := fn ( obj , h , closeReader )
2021-01-11 22:36:51 -08:00
if err != nil {
2022-03-08 13:58:55 -08:00
return nil , proxy , err
2021-01-11 22:36:51 -08:00
}
2021-02-10 17:25:04 -08:00
reader . ObjInfo = oi . Clone ( )
2022-03-08 13:58:55 -08:00
if rs != nil {
contentSize , err := parseSizeFromContentRange ( h )
if err != nil {
return nil , proxy , err
}
reader . ObjInfo . Size = contentSize
}
return reader , proxyResult { Proxy : true } , nil
2021-01-11 22:36:51 -08:00
}
2022-05-08 16:50:31 -07:00
func getProxyTargets ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( tgts * madmin . BucketTargets ) {
if opts . VersionSuspended {
return & madmin . BucketTargets { }
}
2021-01-11 22:36:51 -08:00
cfg , err := getReplicationConfig ( ctx , bucket )
2021-09-18 16:31:35 -04:00
if err != nil || cfg == nil {
return & madmin . BucketTargets { }
}
topts := replication . ObjectOpts { Name : object }
tgtArns := cfg . FilterTargetArns ( topts )
tgts = & madmin . BucketTargets { Targets : make ( [ ] madmin . BucketTarget , len ( tgtArns ) ) }
for i , tgtArn := range tgtArns {
tgt := globalBucketTargetSys . GetRemoteBucketTargetByArn ( ctx , bucket , tgtArn )
tgts . Targets [ i ] = tgt
2021-01-11 22:36:51 -08:00
}
2021-09-18 16:31:35 -04:00
return tgts
2021-01-11 22:36:51 -08:00
}
2021-01-27 11:22:34 -08:00
2022-03-08 13:58:55 -08:00
func proxyHeadToRepTarget ( ctx context . Context , bucket , object string , rs * HTTPRangeSpec , opts ObjectOptions , proxyTargets * madmin . BucketTargets ) ( tgt * TargetClient , oi ObjectInfo , proxy proxyResult ) {
2021-01-11 22:36:51 -08:00
// this option is set when active-active replication is in place between site A -> B,
// and site B does not have the object yet.
2021-01-27 11:22:34 -08:00
if opts . ProxyRequest || ( opts . ProxyHeaderSet && ! opts . ProxyRequest ) { // true only when site B sets MinIOSourceProxyRequest header
2022-03-08 13:58:55 -08:00
return nil , oi , proxy
2021-01-11 22:36:51 -08:00
}
2021-09-18 16:31:35 -04:00
for _ , t := range proxyTargets . Targets {
tgt = globalBucketTargetSys . GetRemoteTargetClient ( ctx , t . Arn )
2022-08-16 17:46:22 -07:00
if tgt == nil || globalBucketTargetSys . isOffline ( tgt . EndpointURL ( ) ) {
2021-09-18 16:31:35 -04:00
continue
}
// if proxying explicitly disabled on remote target
if tgt . disableProxy {
continue
}
2021-01-11 22:36:51 -08:00
2021-09-18 16:31:35 -04:00
gopts := miniogo . GetObjectOptions {
VersionID : opts . VersionID ,
ServerSideEncryption : opts . ServerSideEncryption ,
Internal : miniogo . AdvancedGetOptions {
ReplicationProxyRequest : "true" ,
} ,
2022-03-08 13:58:55 -08:00
PartNumber : opts . PartNumber ,
2021-09-18 16:31:35 -04:00
}
2022-03-08 13:58:55 -08:00
if rs != nil {
h , err := rs . ToHeader ( )
if err != nil {
logger . LogIf ( ctx , fmt . Errorf ( "Invalid range header for %s/%s(%s) - %w" , bucket , object , opts . VersionID , err ) )
continue
}
gopts . Set ( xhttp . Range , h )
}
2021-09-18 16:31:35 -04:00
objInfo , err := tgt . StatObject ( ctx , t . TargetBucket , object , gopts )
if err != nil {
2022-03-08 13:58:55 -08:00
if isErrInvalidRange ( ErrorRespToObjectError ( err , bucket , object ) ) {
return nil , oi , proxyResult { Err : err }
}
2021-09-18 16:31:35 -04:00
continue
}
tags , _ := tags . MapToObjectTags ( objInfo . UserTags )
oi = ObjectInfo {
Bucket : bucket ,
Name : object ,
ModTime : objInfo . LastModified ,
Size : objInfo . Size ,
ETag : objInfo . ETag ,
VersionID : objInfo . VersionID ,
IsLatest : objInfo . IsLatest ,
DeleteMarker : objInfo . IsDeleteMarker ,
ContentType : objInfo . ContentType ,
Expires : objInfo . Expires ,
StorageClass : objInfo . StorageClass ,
ReplicationStatusInternal : objInfo . ReplicationStatus ,
UserTags : tags . String ( ) ,
}
oi . UserDefined = make ( map [ string ] string , len ( objInfo . Metadata ) )
for k , v := range objInfo . Metadata {
oi . UserDefined [ k ] = v [ 0 ]
}
ce , ok := oi . UserDefined [ xhttp . ContentEncoding ]
if ! ok {
ce , ok = oi . UserDefined [ strings . ToLower ( xhttp . ContentEncoding ) ]
}
if ok {
oi . ContentEncoding = ce
}
2022-03-08 13:58:55 -08:00
return tgt , oi , proxyResult { Proxy : true }
2021-01-11 22:36:51 -08:00
}
2022-03-08 13:58:55 -08:00
return nil , oi , proxy
2021-01-11 22:36:51 -08:00
}
// get object info from replication target if active-active replication is in place and
// this node returns a 404
2022-03-08 13:58:55 -08:00
func proxyHeadToReplicationTarget ( ctx context . Context , bucket , object string , rs * HTTPRangeSpec , opts ObjectOptions , proxyTargets * madmin . BucketTargets ) ( oi ObjectInfo , proxy proxyResult ) {
_ , oi , proxy = proxyHeadToRepTarget ( ctx , bucket , object , rs , opts , proxyTargets )
2021-09-18 16:31:35 -04:00
return oi , proxy
2021-01-11 22:36:51 -08:00
}
2021-09-18 16:31:35 -04:00
func scheduleReplication ( ctx context . Context , objInfo ObjectInfo , o ObjectLayer , dsc ReplicateDecision , opType replication . Type ) {
2022-07-12 10:43:32 -07:00
ri := ReplicateObjectInfo { ObjectInfo : objInfo , OpType : opType , Dsc : dsc , EventType : ReplicateIncoming }
2021-09-18 16:31:35 -04:00
if dsc . Synchronous ( ) {
2022-07-12 10:43:32 -07:00
replicateObject ( ctx , ri , o )
2021-01-11 22:36:51 -08:00
} else {
2022-07-12 10:43:32 -07:00
globalReplicationPool . queueReplicaTask ( ri )
2021-04-03 09:03:42 -07:00
}
if sz , err := objInfo . GetActualSize ( ) ; err == nil {
2021-09-18 16:31:35 -04:00
for arn := range dsc . targetsMap {
2021-11-17 21:10:57 +01:00
globalReplicationStats . Update ( objInfo . Bucket , arn , sz , 0 , objInfo . ReplicationStatus , replication . StatusType ( "" ) , opType )
2021-09-18 16:31:35 -04:00
}
2021-01-11 22:36:51 -08:00
}
}
2021-09-18 16:31:35 -04:00
func scheduleReplicationDelete ( ctx context . Context , dv DeletedObjectReplicationInfo , o ObjectLayer ) {
2021-04-29 18:20:39 -07:00
globalReplicationPool . queueReplicaDeleteTask ( dv )
2021-09-18 16:31:35 -04:00
for arn := range dv . ReplicationState . Targets {
2021-11-17 21:10:57 +01:00
globalReplicationStats . Update ( dv . Bucket , arn , 0 , 0 , replication . Pending , replication . StatusType ( "" ) , replication . DeleteReplicationType )
2021-09-18 16:31:35 -04:00
}
for arn := range dv . ReplicationState . PurgeTargets {
2021-11-17 21:10:57 +01:00
globalReplicationStats . Update ( dv . Bucket , arn , 0 , 0 , replication . Pending , replication . StatusType ( "" ) , replication . DeleteReplicationType )
2021-09-18 16:31:35 -04:00
}
2021-01-11 22:36:51 -08:00
}
2021-06-01 19:59:11 -07:00
type replicationConfig struct {
2021-09-18 16:31:35 -04:00
Config * replication . Config
remotes * madmin . BucketTargets
2021-06-01 19:59:11 -07:00
}
func ( c replicationConfig ) Empty ( ) bool {
return c . Config == nil
}
2022-01-02 09:15:06 -08:00
2021-06-01 19:59:11 -07:00
func ( c replicationConfig ) Replicate ( opts replication . ObjectOpts ) bool {
return c . Config . Replicate ( opts )
}
// Resync returns true if replication reset is requested
2021-09-18 16:31:35 -04:00
func ( c replicationConfig ) Resync ( ctx context . Context , oi ObjectInfo , dsc * ReplicateDecision , tgtStatuses map [ string ] replication . StatusType ) ( r ResyncDecision ) {
2021-06-01 19:59:11 -07:00
if c . Empty ( ) {
2021-09-18 16:31:35 -04:00
return
2021-06-01 19:59:11 -07:00
}
2021-09-18 16:31:35 -04:00
// Now overlay existing object replication choices for target
2021-06-01 19:59:11 -07:00
if oi . DeleteMarker {
2021-09-18 16:31:35 -04:00
opts := replication . ObjectOpts {
2021-06-01 19:59:11 -07:00
Name : oi . Name ,
SSEC : crypto . SSEC . IsEncrypted ( oi . UserDefined ) ,
UserTags : oi . UserTags ,
DeleteMarker : oi . DeleteMarker ,
VersionID : oi . VersionID ,
OpType : replication . DeleteReplicationType ,
2022-01-02 09:15:06 -08:00
ExistingObject : true ,
}
2021-09-18 16:31:35 -04:00
tgtArns := c . Config . FilterTargetArns ( opts )
// indicates no matching target with Existing object replication enabled.
if len ( tgtArns ) == 0 {
return
2021-06-01 19:59:11 -07:00
}
2021-09-18 16:31:35 -04:00
for _ , t := range tgtArns {
opts . TargetArn = t
// Update replication decision for target based on existing object replciation rule.
dsc . Set ( newReplicateTargetDecision ( t , c . Replicate ( opts ) , false ) )
}
return c . resync ( oi , dsc , tgtStatuses )
}
// Ignore previous replication status when deciding if object can be re-replicated
objInfo := oi . Clone ( )
objInfo . ReplicationStatusInternal = ""
objInfo . VersionPurgeStatusInternal = ""
objInfo . ReplicationStatus = ""
objInfo . VersionPurgeStatus = ""
2022-07-28 13:43:02 -07:00
delete ( objInfo . UserDefined , xhttp . AmzBucketReplicationStatus )
2021-09-18 16:31:35 -04:00
resyncdsc := mustReplicate ( ctx , oi . Bucket , oi . Name , getMustReplicateOptions ( objInfo , replication . ExistingObjectReplicationType , ObjectOptions { } ) )
dsc = & resyncdsc
return c . resync ( oi , dsc , tgtStatuses )
2021-06-01 19:59:11 -07:00
}
// wrapper function for testability. Returns true if a new reset is requested on
// already replicated objects OR object qualifies for existing object replication
// and no reset requested.
2021-09-18 16:31:35 -04:00
func ( c replicationConfig ) resync ( oi ObjectInfo , dsc * ReplicateDecision , tgtStatuses map [ string ] replication . StatusType ) ( r ResyncDecision ) {
r = ResyncDecision {
targets : make ( map [ string ] ResyncTargetDecision ) ,
}
if c . remotes == nil {
return
}
for _ , tgt := range c . remotes . Targets {
d , ok := dsc . targetsMap [ tgt . Arn ]
if ! ok {
continue
}
if ! d . Replicate {
continue
}
r . targets [ d . Arn ] = resyncTarget ( oi , tgt . Arn , tgt . ResetID , tgt . ResetBeforeDate , tgtStatuses [ tgt . Arn ] )
}
return
}
func targetResetHeader ( arn string ) string {
return fmt . Sprintf ( "%s-%s" , ReservedMetadataPrefixLower + ReplicationReset , arn )
}
func resyncTarget ( oi ObjectInfo , arn string , resetID string , resetBeforeDate time . Time , tgtStatus replication . StatusType ) ( rd ResyncTargetDecision ) {
rd = ResyncTargetDecision {
ResetID : resetID ,
ResetBeforeDate : resetBeforeDate ,
}
rs , ok := oi . UserDefined [ targetResetHeader ( arn ) ]
if ! ok {
2021-11-16 09:28:29 -08:00
rs , ok = oi . UserDefined [ xhttp . MinIOReplicationResetStatus ] // for backward compatibility
2021-06-01 19:59:11 -07:00
}
if ! ok { // existing object replication is enabled and object version is unreplicated so far.
2021-09-18 16:31:35 -04:00
if resetID != "" && oi . ModTime . Before ( resetBeforeDate ) { // trigger replication if `mc replicate reset` requested
rd . Replicate = true
return
2021-06-01 19:59:11 -07:00
}
2021-09-18 16:31:35 -04:00
// For existing object reset - this condition is needed
rd . Replicate = tgtStatus == ""
return
2021-06-01 19:59:11 -07:00
}
2021-09-18 16:31:35 -04:00
if resetID == "" || resetBeforeDate . Equal ( timeSentinel ) { // no reset in progress
return
2021-06-01 19:59:11 -07:00
}
2021-09-18 16:31:35 -04:00
2021-06-01 19:59:11 -07:00
// if already replicated, return true if a new reset was requested.
splits := strings . SplitN ( rs , ";" , 2 )
2021-09-18 16:31:35 -04:00
if len ( splits ) != 2 {
return
}
newReset := splits [ 1 ] != resetID
if ! newReset && tgtStatus == replication . Completed {
2021-06-01 19:59:11 -07:00
// already replicated and no reset requested
2021-09-18 16:31:35 -04:00
return
2021-06-01 19:59:11 -07:00
}
2021-09-18 16:31:35 -04:00
rd . Replicate = newReset && oi . ModTime . Before ( resetBeforeDate )
return
2021-06-01 19:59:11 -07:00
}
2021-11-19 14:46:14 -08:00
2022-05-23 09:15:30 -07:00
func getAllLatestReplicationStats ( bucketsUsage map [ string ] BucketUsageInfo ) ( bucketsReplicationStats map [ string ] BucketReplicationStats ) {
peerBucketStatsList := globalNotificationSys . GetClusterAllBucketStats ( GlobalContext )
bucketsReplicationStats = make ( map [ string ] BucketReplicationStats , len ( bucketsUsage ) )
for bucket , u := range bucketsUsage {
bucketStats := make ( [ ] BucketStats , len ( peerBucketStatsList ) )
for i , peerBucketStats := range peerBucketStatsList {
bucketStat , ok := peerBucketStats [ bucket ]
if ! ok {
continue
}
bucketStats [ i ] = bucketStat
}
bucketsReplicationStats [ bucket ] = calculateBucketReplicationStats ( bucket , u , bucketStats )
}
return bucketsReplicationStats
}
func calculateBucketReplicationStats ( bucket string , u BucketUsageInfo , bucketStats [ ] BucketStats ) ( s BucketReplicationStats ) {
2021-11-19 14:46:14 -08:00
// accumulate cluster bucket stats
stats := make ( map [ string ] * BucketReplicationStat )
var totReplicaSize int64
for _ , bucketStat := range bucketStats {
totReplicaSize += bucketStat . ReplicationStats . ReplicaSize
for arn , stat := range bucketStat . ReplicationStats . Stats {
oldst := stats [ arn ]
if oldst == nil {
oldst = & BucketReplicationStat { }
}
stats [ arn ] = & BucketReplicationStat {
FailedCount : stat . FailedCount + oldst . FailedCount ,
FailedSize : stat . FailedSize + oldst . FailedSize ,
ReplicatedSize : stat . ReplicatedSize + oldst . ReplicatedSize ,
Latency : stat . Latency . merge ( oldst . Latency ) ,
}
}
}
// add initial usage stat to cluster stats
usageStat := globalReplicationStats . GetInitialUsage ( bucket )
totReplicaSize += usageStat . ReplicaSize
2021-12-17 15:33:13 -08:00
for arn , stat := range usageStat . Stats {
st , ok := stats [ arn ]
if ! ok {
st = & BucketReplicationStat { }
2021-11-19 14:46:14 -08:00
stats [ arn ] = st
}
2021-12-17 15:33:13 -08:00
st . ReplicatedSize += stat . ReplicatedSize
st . FailedSize += stat . FailedSize
st . FailedCount += stat . FailedCount
2021-11-19 14:46:14 -08:00
}
s = BucketReplicationStats {
Stats : make ( map [ string ] * BucketReplicationStat , len ( stats ) ) ,
}
var latestTotReplicatedSize int64
for _ , st := range u . ReplicationInfo {
latestTotReplicatedSize += int64 ( st . ReplicatedSize )
}
// normalize computed real time stats with latest usage stat
for arn , tgtstat := range stats {
st := BucketReplicationStat { }
bu , ok := u . ReplicationInfo [ arn ]
if ! ok {
bu = BucketTargetUsageInfo { }
}
// use in memory replication stats if it is ahead of usage info.
st . ReplicatedSize = int64 ( bu . ReplicatedSize )
if tgtstat . ReplicatedSize >= int64 ( bu . ReplicatedSize ) {
st . ReplicatedSize = tgtstat . ReplicatedSize
}
s . ReplicatedSize += st . ReplicatedSize
// Reset FailedSize and FailedCount to 0 for negative overflows which can
// happen since data usage picture can lag behind actual usage state at the time of cluster start
st . FailedSize = int64 ( math . Max ( float64 ( tgtstat . FailedSize ) , 0 ) )
st . FailedCount = int64 ( math . Max ( float64 ( tgtstat . FailedCount ) , 0 ) )
st . Latency = tgtstat . Latency
s . Stats [ arn ] = & st
s . FailedSize += st . FailedSize
s . FailedCount += st . FailedCount
}
// normalize overall stats
s . ReplicaSize = int64 ( math . Max ( float64 ( totReplicaSize ) , float64 ( u . ReplicaSize ) ) )
s . ReplicatedSize = int64 ( math . Max ( float64 ( s . ReplicatedSize ) , float64 ( latestTotReplicatedSize ) ) )
return s
}
2022-02-10 10:16:52 -08:00
2022-05-23 09:15:30 -07:00
// get the most current of in-memory replication stats and data usage info from crawler.
func getLatestReplicationStats ( bucket string , u BucketUsageInfo ) ( s BucketReplicationStats ) {
bucketStats := globalNotificationSys . GetClusterBucketStats ( GlobalContext , bucket )
return calculateBucketReplicationStats ( bucket , u , bucketStats )
}
2022-07-12 10:43:32 -07:00
const resyncTimeInterval = time . Minute * 1
2022-02-10 10:16:52 -08:00
2022-07-12 10:43:32 -07:00
// updateResyncStatus persists in-memory resync metadata stats to disk at periodic intervals
func ( p * ReplicationPool ) updateResyncStatus ( ctx context . Context , objectAPI ObjectLayer ) {
2022-02-10 10:16:52 -08:00
resyncTimer := time . NewTimer ( resyncTimeInterval )
defer resyncTimer . Stop ( )
for {
select {
case <- resyncTimer . C :
now := UTCNow ( )
p . resyncState . RLock ( )
for bucket , brs := range p . resyncState . statusMap {
var updt bool
for _ , st := range brs . TargetsMap {
// if resync in progress or just ended, needs to save to disk
if st . EndTime . Equal ( timeSentinel ) || now . Sub ( st . EndTime ) <= resyncTimeInterval {
updt = true
break
}
}
if updt {
brs . LastUpdate = now
if err := saveResyncStatus ( ctx , bucket , brs , objectAPI ) ; err != nil {
2022-08-04 16:10:08 -07:00
logger . LogIf ( ctx , fmt . Errorf ( "Could not save resync metadata to drive for %s - %w" , bucket , err ) )
2022-02-10 10:16:52 -08:00
continue
}
}
}
p . resyncState . RUnlock ( )
2022-05-17 22:42:59 -07:00
resyncTimer . Reset ( resyncTimeInterval )
2022-02-10 10:16:52 -08:00
case <- ctx . Done ( ) :
// server could be restarting - need
// to exit immediately
return
}
}
}
// resyncBucket resyncs all qualifying objects as per replication rules for the target
// ARN
func resyncBucket ( ctx context . Context , bucket , arn string , heal bool , objectAPI ObjectLayer ) {
resyncStatus := ResyncFailed
defer func ( ) {
globalReplicationPool . resyncState . Lock ( )
m := globalReplicationPool . resyncState . statusMap [ bucket ]
st := m . TargetsMap [ arn ]
st . EndTime = UTCNow ( )
st . ResyncStatus = resyncStatus
m . TargetsMap [ arn ] = st
2022-06-06 02:54:39 -07:00
globalReplicationPool . resyncState . statusMap [ bucket ] = m
2022-02-10 10:16:52 -08:00
globalReplicationPool . resyncState . Unlock ( )
} ( )
// Allocate new results channel to receive ObjectInfo.
objInfoCh := make ( chan ObjectInfo )
cfg , err := getReplicationConfig ( ctx , bucket )
if err != nil {
logger . LogIf ( ctx , fmt . Errorf ( "Replication resync of %s for arn %s failed with %w" , bucket , arn , err ) )
return
}
tgts , err := globalBucketTargetSys . ListBucketTargets ( ctx , bucket )
if err != nil {
logger . LogIf ( ctx , fmt . Errorf ( "Replication resync of %s for arn %s failed %w" , bucket , arn , err ) )
return
}
rcfg := replicationConfig {
Config : cfg ,
remotes : tgts ,
}
tgtArns := cfg . FilterTargetArns (
replication . ObjectOpts {
OpType : replication . ResyncReplicationType ,
TargetArn : arn ,
} )
if len ( tgtArns ) != 1 {
logger . LogIf ( ctx , fmt . Errorf ( "Replication resync failed for %s - arn specified %s is missing in the replication config" , bucket , arn ) )
return
}
tgt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , arn )
if tgt == nil {
logger . LogIf ( ctx , fmt . Errorf ( "Replication resync failed for %s - target could not be created for arn %s" , bucket , arn ) )
return
}
2022-08-18 17:49:08 -07:00
// Walk through all object versions - Walk() is always in ascending order needed to ensure
// delete marker replicated to target after object version is first created.
if err := objectAPI . Walk ( ctx , bucket , "" , objInfoCh , ObjectOptions { } ) ; err != nil {
2022-02-10 10:16:52 -08:00
logger . LogIf ( ctx , err )
return
}
globalReplicationPool . resyncState . RLock ( )
m := globalReplicationPool . resyncState . statusMap [ bucket ]
st := m . TargetsMap [ arn ]
globalReplicationPool . resyncState . RUnlock ( )
var lastCheckpoint string
if st . ResyncStatus == ResyncStarted || st . ResyncStatus == ResyncFailed {
lastCheckpoint = st . Object
}
for obj := range objInfoCh {
if heal && lastCheckpoint != "" && lastCheckpoint != obj . Name {
continue
}
lastCheckpoint = ""
roi := getHealReplicateObjectInfo ( obj , rcfg )
if ! roi . ExistingObjResync . mustResync ( ) {
continue
}
if roi . DeleteMarker || ! roi . VersionPurgeStatus . Empty ( ) {
versionID := ""
dmVersionID := ""
if roi . VersionPurgeStatus . Empty ( ) {
dmVersionID = roi . VersionID
} else {
versionID = roi . VersionID
}
doi := DeletedObjectReplicationInfo {
DeletedObject : DeletedObject {
ObjectName : roi . Name ,
DeleteMarkerVersionID : dmVersionID ,
VersionID : versionID ,
ReplicationState : roi . getReplicationState ( roi . Dsc . String ( ) , versionID , true ) ,
DeleteMarkerMTime : DeleteMarkerMTime { roi . ModTime } ,
DeleteMarker : roi . DeleteMarker ,
} ,
2022-07-12 10:43:32 -07:00
Bucket : roi . Bucket ,
OpType : replication . ExistingObjectReplicationType ,
EventType : ReplicateExistingDelete ,
2022-02-10 10:16:52 -08:00
}
2022-07-12 10:43:32 -07:00
replicateDelete ( ctx , doi , objectAPI )
2022-02-10 10:16:52 -08:00
} else {
roi . OpType = replication . ExistingObjectReplicationType
2022-07-12 10:43:32 -07:00
roi . EventType = ReplicateExisting
replicateObject ( ctx , roi , objectAPI )
2022-02-10 10:16:52 -08:00
}
_ , err = tgt . StatObject ( ctx , tgt . Bucket , roi . Name , miniogo . StatObjectOptions {
VersionID : roi . VersionID ,
Internal : miniogo . AdvancedGetOptions {
ReplicationProxyRequest : "false" ,
} ,
} )
globalReplicationPool . resyncState . Lock ( )
m = globalReplicationPool . resyncState . statusMap [ bucket ]
st = m . TargetsMap [ arn ]
st . Object = roi . Name
if err != nil {
if roi . DeleteMarker && isErrMethodNotAllowed ( ErrorRespToObjectError ( err , bucket , roi . Name ) ) {
st . ReplicatedCount ++
} else {
st . FailedCount ++
}
} else {
st . ReplicatedCount ++
st . ReplicatedSize += roi . Size
}
m . TargetsMap [ arn ] = st
2022-06-06 02:54:39 -07:00
globalReplicationPool . resyncState . statusMap [ bucket ] = m
2022-02-10 10:16:52 -08:00
globalReplicationPool . resyncState . Unlock ( )
}
resyncStatus = ResyncCompleted
}
// start replication resync for the remote target ARN specified
func startReplicationResync ( ctx context . Context , bucket , arn , resyncID string , resyncBeforeDate time . Time , objAPI ObjectLayer ) error {
if bucket == "" {
return fmt . Errorf ( "bucket name is empty" )
}
if arn == "" {
return fmt . Errorf ( "target ARN specified for resync is empty" )
}
// Check if the current bucket has quota restrictions, if not skip it
cfg , err := getReplicationConfig ( ctx , bucket )
if err != nil {
return err
}
tgtArns := cfg . FilterTargetArns (
replication . ObjectOpts {
OpType : replication . ResyncReplicationType ,
TargetArn : arn ,
} )
if len ( tgtArns ) == 0 {
return fmt . Errorf ( "arn %s specified for resync not found in replication config" , arn )
}
data , err := loadBucketResyncMetadata ( ctx , bucket , objAPI )
if err != nil {
return err
}
// validate if resync is in progress for this arn
for tArn , st := range data . TargetsMap {
if arn == tArn && st . ResyncStatus == ResyncStarted {
return fmt . Errorf ( "Resync of bucket %s is already in progress for remote bucket %s" , bucket , arn )
}
}
status := TargetReplicationResyncStatus {
ResyncID : resyncID ,
ResyncBeforeDate : resyncBeforeDate ,
StartTime : UTCNow ( ) ,
ResyncStatus : ResyncStarted ,
Bucket : bucket ,
}
data . TargetsMap [ arn ] = status
if err = saveResyncStatus ( ctx , bucket , data , objAPI ) ; err != nil {
return err
}
globalReplicationPool . resyncState . Lock ( )
defer globalReplicationPool . resyncState . Unlock ( )
brs , ok := globalReplicationPool . resyncState . statusMap [ bucket ]
if ! ok {
brs = BucketReplicationResyncStatus {
Version : resyncMetaVersion ,
TargetsMap : make ( map [ string ] TargetReplicationResyncStatus ) ,
}
}
brs . TargetsMap [ arn ] = status
globalReplicationPool . resyncState . statusMap [ bucket ] = brs
go resyncBucket ( GlobalContext , bucket , arn , false , objAPI )
return nil
}
// delete resync metadata from replication resync state in memory
func ( p * ReplicationPool ) deleteResyncMetadata ( ctx context . Context , bucket string ) {
if p == nil {
return
}
p . resyncState . Lock ( )
delete ( p . resyncState . statusMap , bucket )
defer p . resyncState . Unlock ( )
}
// initResync - initializes bucket replication resync for all buckets.
func ( p * ReplicationPool ) initResync ( ctx context . Context , buckets [ ] BucketInfo , objAPI ObjectLayer ) error {
if objAPI == nil {
return errServerNotInitialized
}
// Load bucket metadata sys in background
go p . loadResync ( ctx , buckets , objAPI )
return nil
}
// Loads bucket replication resync statuses into memory.
func ( p * ReplicationPool ) loadResync ( ctx context . Context , buckets [ ] BucketInfo , objAPI ObjectLayer ) {
for index := range buckets {
meta , err := loadBucketResyncMetadata ( ctx , buckets [ index ] . Name , objAPI )
if err != nil {
2022-07-13 16:29:10 -07:00
if ! errors . Is ( err , errVolumeNotFound ) {
2022-02-10 10:16:52 -08:00
logger . LogIf ( ctx , err )
}
2022-07-13 16:29:10 -07:00
continue
2022-02-10 10:16:52 -08:00
}
2022-07-13 16:29:10 -07:00
p . resyncState . Lock ( )
2022-02-10 10:16:52 -08:00
p . resyncState . statusMap [ buckets [ index ] . Name ] = meta
2022-07-13 16:29:10 -07:00
p . resyncState . Unlock ( )
2022-02-10 10:16:52 -08:00
}
for index := range buckets {
bucket := buckets [ index ] . Name
2022-07-13 16:29:10 -07:00
p . resyncState . RLock ( )
2022-02-10 10:16:52 -08:00
m , ok := p . resyncState . statusMap [ bucket ]
2022-07-13 16:29:10 -07:00
p . resyncState . RUnlock ( )
2022-02-10 10:16:52 -08:00
if ok {
for arn , st := range m . TargetsMap {
if st . ResyncStatus == ResyncFailed || st . ResyncStatus == ResyncStarted {
go resyncBucket ( ctx , bucket , arn , true , objAPI )
}
}
}
}
}
// load bucket resync metadata from disk
func loadBucketResyncMetadata ( ctx context . Context , bucket string , objAPI ObjectLayer ) ( brs BucketReplicationResyncStatus , e error ) {
brs = newBucketResyncStatus ( bucket )
resyncDirPath := path . Join ( bucketMetaPrefix , bucket , replicationDir )
data , err := readConfig ( GlobalContext , objAPI , pathJoin ( resyncDirPath , resyncFileName ) )
if err != nil && err != errConfigNotFound {
return brs , err
}
if len ( data ) == 0 {
// Seems to be empty.
return brs , nil
}
if len ( data ) <= 4 {
return brs , fmt . Errorf ( "replication resync: no data" )
}
// Read resync meta header
switch binary . LittleEndian . Uint16 ( data [ 0 : 2 ] ) {
case resyncMetaFormat :
default :
return brs , fmt . Errorf ( "resyncMeta: unknown format: %d" , binary . LittleEndian . Uint16 ( data [ 0 : 2 ] ) )
}
switch binary . LittleEndian . Uint16 ( data [ 2 : 4 ] ) {
case resyncMetaVersion :
default :
return brs , fmt . Errorf ( "resyncMeta: unknown version: %d" , binary . LittleEndian . Uint16 ( data [ 2 : 4 ] ) )
}
// OK, parse data.
if _ , err = brs . UnmarshalMsg ( data [ 4 : ] ) ; err != nil {
return brs , err
}
switch brs . Version {
case resyncMetaVersionV1 :
default :
return brs , fmt . Errorf ( "unexpected resync meta version: %d" , brs . Version )
}
return brs , nil
}
// save resync status to resync.bin
func saveResyncStatus ( ctx context . Context , bucket string , brs BucketReplicationResyncStatus , objectAPI ObjectLayer ) error {
data := make ( [ ] byte , 4 , brs . Msgsize ( ) + 4 )
// Initialize the resync meta header.
binary . LittleEndian . PutUint16 ( data [ 0 : 2 ] , resyncMetaFormat )
binary . LittleEndian . PutUint16 ( data [ 2 : 4 ] , resyncMetaVersion )
buf , err := brs . MarshalMsg ( data )
if err != nil {
return err
}
configFile := path . Join ( bucketMetaPrefix , bucket , replicationDir , resyncFileName )
return saveConfig ( ctx , objectAPI , configFile , buf )
}
2022-07-21 11:05:44 -07:00
// getReplicationDiff returns unreplicated objects in a channel
func getReplicationDiff ( ctx context . Context , objAPI ObjectLayer , bucket string , opts madmin . ReplDiffOpts ) ( diffCh chan madmin . DiffInfo , err error ) {
objInfoCh := make ( chan ObjectInfo )
if err := objAPI . Walk ( ctx , bucket , opts . Prefix , objInfoCh , ObjectOptions { } ) ; err != nil {
logger . LogIf ( ctx , err )
return diffCh , err
}
cfg , err := getReplicationConfig ( ctx , bucket )
if err != nil {
logger . LogIf ( ctx , err )
return diffCh , err
}
tgts , err := globalBucketTargetSys . ListBucketTargets ( ctx , bucket )
if err != nil {
logger . LogIf ( ctx , err )
return diffCh , err
}
rcfg := replicationConfig {
Config : cfg ,
remotes : tgts ,
}
diffCh = make ( chan madmin . DiffInfo , 4000 )
go func ( ) {
defer close ( diffCh )
for obj := range objInfoCh {
// Ignore object prefixes which are excluded
// from versioning via the MinIO bucket versioning extension.
if globalBucketVersioningSys . PrefixSuspended ( bucket , obj . Name ) {
continue
}
roi := getHealReplicateObjectInfo ( obj , rcfg )
switch roi . ReplicationStatus {
case replication . Completed , replication . Replica :
if ! opts . Verbose {
continue
}
fallthrough
default :
// ignore pre-existing objects that don't satisfy replication rule(s)
if roi . ReplicationStatus . Empty ( ) && ! roi . ExistingObjResync . mustResync ( ) {
continue
}
tgtsMap := make ( map [ string ] madmin . TgtDiffInfo )
for arn , st := range roi . TargetStatuses {
if opts . ARN == "" || opts . ARN == arn {
if ! opts . Verbose && ( st == replication . Completed || st == replication . Replica ) {
continue
}
tgtsMap [ arn ] = madmin . TgtDiffInfo {
ReplicationStatus : st . String ( ) ,
}
}
}
for arn , st := range roi . TargetPurgeStatuses {
if opts . ARN == "" || opts . ARN == arn {
if ! opts . Verbose && st == Complete {
continue
}
t , ok := tgtsMap [ arn ]
if ! ok {
t = madmin . TgtDiffInfo { }
}
t . DeleteReplicationStatus = string ( st )
tgtsMap [ arn ] = t
}
}
select {
case diffCh <- madmin . DiffInfo {
Object : obj . Name ,
VersionID : obj . VersionID ,
LastModified : obj . ModTime ,
IsDeleteMarker : obj . DeleteMarker ,
ReplicationStatus : string ( roi . ReplicationStatus ) ,
DeleteReplicationStatus : string ( roi . VersionPurgeStatus ) ,
ReplicationTimestamp : roi . ReplicationTimestamp ,
Targets : tgtsMap ,
} :
case <- ctx . Done ( ) :
return
}
}
}
} ( )
return diffCh , nil
}
2022-08-09 15:00:24 -07:00
// QueueReplicationHeal is a wrapper for queueReplicationHeal
func QueueReplicationHeal ( ctx context . Context , bucket string , oi ObjectInfo ) {
// un-versioned case
if oi . VersionID == "" {
return
}
rcfg , _ , _ := globalBucketMetadataSys . GetReplicationConfig ( ctx , bucket )
tgts , _ := globalBucketTargetSys . ListBucketTargets ( ctx , bucket )
queueReplicationHeal ( ctx , bucket , oi , replicationConfig {
Config : rcfg ,
remotes : tgts ,
} )
}
// queueReplicationHeal enqueues objects that failed replication OR eligible for resyncing through
// an ongoing resync operation or via existing objects replication configuration setting.
func queueReplicationHeal ( ctx context . Context , bucket string , oi ObjectInfo , rcfg replicationConfig ) ( roi ReplicateObjectInfo ) {
// un-versioned case
if oi . VersionID == "" {
return roi
}
if rcfg . Config == nil || rcfg . remotes == nil {
return roi
}
roi = getHealReplicateObjectInfo ( oi , rcfg )
if ! roi . Dsc . ReplicateAny ( ) {
return
}
// early return if replication already done, otherwise we need to determine if this
// version is an existing object that needs healing.
if oi . ReplicationStatus == replication . Completed && oi . VersionPurgeStatus . Empty ( ) && ! roi . ExistingObjResync . mustResync ( ) {
return
}
if roi . DeleteMarker || ! roi . VersionPurgeStatus . Empty ( ) {
versionID := ""
dmVersionID := ""
if roi . VersionPurgeStatus . Empty ( ) {
dmVersionID = roi . VersionID
} else {
versionID = roi . VersionID
}
dv := DeletedObjectReplicationInfo {
DeletedObject : DeletedObject {
ObjectName : roi . Name ,
DeleteMarkerVersionID : dmVersionID ,
VersionID : versionID ,
ReplicationState : roi . getReplicationState ( roi . Dsc . String ( ) , versionID , true ) ,
DeleteMarkerMTime : DeleteMarkerMTime { roi . ModTime } ,
DeleteMarker : roi . DeleteMarker ,
} ,
Bucket : roi . Bucket ,
OpType : replication . HealReplicationType ,
EventType : ReplicateHealDelete ,
}
// heal delete marker replication failure or versioned delete replication failure
if roi . ReplicationStatus == replication . Pending ||
roi . ReplicationStatus == replication . Failed ||
roi . VersionPurgeStatus == Failed || roi . VersionPurgeStatus == Pending {
globalReplicationPool . queueReplicaDeleteTask ( dv )
return
}
// if replication status is Complete on DeleteMarker and existing object resync required
if roi . ExistingObjResync . mustResync ( ) && ( roi . ReplicationStatus == replication . Completed || roi . ReplicationStatus . Empty ( ) ) {
queueReplicateDeletesWrapper ( dv , roi . ExistingObjResync )
return
}
return
}
if roi . ExistingObjResync . mustResync ( ) {
roi . OpType = replication . ExistingObjectReplicationType
}
switch roi . ReplicationStatus {
case replication . Pending , replication . Failed :
roi . EventType = ReplicateHeal
globalReplicationPool . queueReplicaTask ( roi )
return
}
if roi . ExistingObjResync . mustResync ( ) {
roi . EventType = ReplicateExisting
globalReplicationPool . queueReplicaTask ( roi )
}
return
}
2022-08-22 16:53:06 -07:00
const mrfTimeInterval = 5 * time . Minute
func ( p * ReplicationPool ) persistMRF ( ) {
var mu sync . Mutex
entries := make ( map [ string ] MRFReplicateEntry )
mTimer := time . NewTimer ( mrfTimeInterval )
defer mTimer . Stop ( )
saveMRFToDisk := func ( drain bool ) {
mu . Lock ( )
defer mu . Unlock ( )
if len ( entries ) == 0 {
return
}
cctx := p . ctx
if drain {
cctx = context . Background ( )
// drain all mrf entries and save to disk
for e := range p . mrfSaveCh {
entries [ e . versionID ] = e
}
}
if err := p . saveMRFEntries ( cctx , entries ) ; err != nil {
logger . LogOnceIf ( p . ctx , fmt . Errorf ( "Unable to persist replication failures to disk:%w" , err ) , string ( replicationSubsystem ) )
}
entries = make ( map [ string ] MRFReplicateEntry )
return
}
for {
select {
case <- mTimer . C :
saveMRFToDisk ( false )
mTimer . Reset ( mrfTimeInterval )
case <- p . ctx . Done ( ) :
close ( p . mrfSaveCh )
saveMRFToDisk ( true )
return
case e , ok := <- p . mrfSaveCh :
if ! ok {
return
}
var cnt int
mu . Lock ( )
entries [ e . versionID ] = e
cnt = len ( entries )
mu . Unlock ( )
if cnt >= cap ( p . mrfSaveCh ) || len ( p . mrfSaveCh ) >= int ( 0.8 * float32 ( cap ( p . mrfSaveCh ) ) ) {
saveMRFToDisk ( true )
}
}
}
}
func ( p * ReplicationPool ) queueMRFSave ( entry MRFReplicateEntry ) {
if p == nil {
return
}
select {
case <- GlobalContext . Done ( ) :
return
case p . mrfSaveCh <- entry :
}
}
// save mrf entries to mrf_<uuid>.bin
func ( p * ReplicationPool ) saveMRFEntries ( ctx context . Context , entries map [ string ] MRFReplicateEntry ) error {
if len ( entries ) == 0 {
return nil
}
v := MRFReplicateEntries {
Entries : entries ,
Version : mrfMetaVersionV1 ,
}
data := make ( [ ] byte , 4 , v . Msgsize ( ) + 4 )
// Initialize the resync meta header.
binary . LittleEndian . PutUint16 ( data [ 0 : 2 ] , resyncMetaFormat )
binary . LittleEndian . PutUint16 ( data [ 2 : 4 ] , resyncMetaVersion )
buf , err := v . MarshalMsg ( data )
if err != nil {
return err
}
configFile := path . Join ( replicationMRFDir , mustGetUUID ( ) + ".bin" )
err = saveConfig ( ctx , p . objLayer , configFile , buf )
return err
}
// load mrf entries from disk
func ( p * ReplicationPool ) loadMRF ( fileName string ) ( re MRFReplicateEntries , e error ) {
data , err := readConfig ( p . ctx , p . objLayer , fileName )
if err != nil && err != errConfigNotFound {
return re , err
}
if len ( data ) == 0 {
// Seems to be empty.
return re , nil
}
if len ( data ) <= 4 {
return re , fmt . Errorf ( "replication mrf: no data" )
}
// Read resync meta header
switch binary . LittleEndian . Uint16 ( data [ 0 : 2 ] ) {
case mrfMetaFormat :
default :
return re , fmt . Errorf ( "replication mrf: unknown format: %d" , binary . LittleEndian . Uint16 ( data [ 0 : 2 ] ) )
}
switch binary . LittleEndian . Uint16 ( data [ 2 : 4 ] ) {
case mrfMetaVersion :
default :
return re , fmt . Errorf ( "replication mrf: unknown version: %d" , binary . LittleEndian . Uint16 ( data [ 2 : 4 ] ) )
}
// OK, parse data.
if _ , err = re . UnmarshalMsg ( data [ 4 : ] ) ; err != nil {
return re , err
}
switch re . Version {
case mrfMetaVersionV1 :
default :
return re , fmt . Errorf ( "unexpected mrf meta version: %d" , re . Version )
}
return re , nil
}
func ( p * ReplicationPool ) processMRF ( ) {
if p == nil || p . objLayer == nil {
return
}
pTimer := time . NewTimer ( mrfTimeInterval )
defer pTimer . Stop ( )
for {
select {
case <- pTimer . C :
// skip healing if all targets are offline
var offlineCnt int
tgts := globalBucketTargetSys . ListTargets ( p . ctx , "" , "" )
for _ , tgt := range tgts {
if globalBucketTargetSys . isOffline ( tgt . URL ( ) ) {
offlineCnt ++
}
}
if len ( tgts ) == offlineCnt {
pTimer . Reset ( mrfTimeInterval )
continue
}
objCh := make ( chan ObjectInfo )
cctx , cancelFn := context . WithCancel ( p . ctx )
if err := p . objLayer . Walk ( cctx , minioMetaBucket , replicationMRFDir , objCh , ObjectOptions { } ) ; err != nil {
pTimer . Reset ( mrfTimeInterval )
cancelFn ( )
logger . LogIf ( p . ctx , err )
continue
}
for item := range objCh {
if err := p . queueMRFHeal ( item . Name ) ; err == nil {
p . objLayer . DeleteObject ( p . ctx , minioMetaBucket , item . Name , ObjectOptions { } )
}
}
pTimer . Reset ( mrfTimeInterval )
cancelFn ( )
case <- p . ctx . Done ( ) :
return
}
}
}
// process sends error logs to the heal channel for an attempt to heal replication.
func ( p * ReplicationPool ) queueMRFHeal ( file string ) error {
if p == nil || p . objLayer == nil {
return errServerNotInitialized
}
mrfRec , err := p . loadMRF ( file )
if err != nil {
return err
}
for vID , e := range mrfRec . Entries {
oi , err := p . objLayer . GetObjectInfo ( p . ctx , e . Bucket , e . Object , ObjectOptions {
VersionID : vID ,
} )
if err != nil {
continue
}
QueueReplicationHeal ( p . ctx , e . Bucket , oi )
}
return nil
}