// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package cmd

import (
	"context"
	"encoding/binary"
	"errors"
	"fmt"
	"io"
	"math"
	"math/rand"
	"net/http"
	"path"
	"reflect"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	"github.com/dustin/go-humanize"
	"github.com/minio/madmin-go/v2"
	"github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/encrypt"
	"github.com/minio/minio-go/v7/pkg/tags"
	"github.com/minio/minio/internal/amztime"
	"github.com/minio/minio/internal/bucket/bandwidth"
	objectlock "github.com/minio/minio/internal/bucket/object/lock"
	"github.com/minio/minio/internal/bucket/replication"
	"github.com/minio/minio/internal/config/storageclass"
	"github.com/minio/minio/internal/crypto"
	"github.com/minio/minio/internal/event"
	"github.com/minio/minio/internal/hash"
	xhttp "github.com/minio/minio/internal/http"
	"github.com/minio/minio/internal/logger"
	"github.com/zeebo/xxh3"
)

const (
	throttleDeadline = 1 * time.Hour
	// ReplicationReset has reset id and timestamp of last reset operation
	ReplicationReset = "replication-reset"
	// ReplicationStatus has internal replication status - stringified representation of target's replication status for all replication
	// activity initiated from this cluster
	ReplicationStatus = "replication-status"
	// ReplicationTimestamp - the last time replication was initiated on this cluster for this object version
	ReplicationTimestamp = "replication-timestamp"
	// ReplicaStatus - this header is present if a replica was received by this cluster for this object version
	ReplicaStatus = "replica-status"
	// ReplicaTimestamp - the last time a replica was received by this cluster for this object version
	ReplicaTimestamp = "replica-timestamp"
	// TaggingTimestamp - the last time a tag metadata modification happened on this cluster for this object version
	TaggingTimestamp = "tagging-timestamp"
	// ObjectLockRetentionTimestamp - the last time a object lock metadata modification happened on this cluster for this object version
	ObjectLockRetentionTimestamp = "objectlock-retention-timestamp"
	// ObjectLockLegalHoldTimestamp - the last time a legal hold metadata modification happened on this cluster for this object version
	ObjectLockLegalHoldTimestamp = "objectlock-legalhold-timestamp"
	// ReplicationWorkerMultiplier is suggested worker multiplier if traffic exceeds replication worker capacity
	ReplicationWorkerMultiplier = 1.5
)

func isReplicationEnabled(ctx context.Context, bucketName string) bool {
	rc, _ := getReplicationConfig(ctx, bucketName)
	return rc != nil
}

// gets replication config associated to a given bucket name.
func getReplicationConfig(ctx context.Context, bucketName string) (rc *replication.Config, err error) {
	rCfg, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucketName)
	if err != nil {
		if errors.Is(err, BucketReplicationConfigNotFound{Bucket: bucketName}) || errors.Is(err, errInvalidArgument) {
			return rCfg, err
		}
		logger.CriticalIf(ctx, err)
	}
	return rCfg, err
}

// validateReplicationDestination returns error if replication destination bucket missing or not configured
// It also returns true if replication destination is same as this server.
func validateReplicationDestination(ctx context.Context, bucket string, rCfg *replication.Config, checkRemote bool) (bool, APIError) {
	var arns []string
	if rCfg.RoleArn != "" {
		arns = append(arns, rCfg.RoleArn)
	} else {
		for _, rule := range rCfg.Rules {
			arns = append(arns, rule.Destination.String())
		}
	}
	var sameTarget bool
	for _, arnStr := range arns {
		arn, err := madmin.ParseARN(arnStr)
		if err != nil {
			return sameTarget, errorCodes.ToAPIErrWithErr(ErrBucketRemoteArnInvalid, err)
		}
		if arn.Type != madmin.ReplicationService {
			return sameTarget, toAPIError(ctx, BucketRemoteArnTypeInvalid{Bucket: bucket})
		}
		clnt := globalBucketTargetSys.GetRemoteTargetClient(ctx, arnStr)
		if clnt == nil {
			return sameTarget, toAPIError(ctx, BucketRemoteTargetNotFound{Bucket: bucket})
		}
		if checkRemote { // validate remote bucket
			found, err := clnt.BucketExists(ctx, arn.Bucket)
			if err != nil {
				return sameTarget, errorCodes.ToAPIErrWithErr(ErrRemoteDestinationNotFoundError, err)
			}
			if !found {
				return sameTarget, errorCodes.ToAPIErrWithErr(ErrRemoteDestinationNotFoundError, BucketRemoteTargetNotFound{Bucket: arn.Bucket})
			}
			if ret, err := globalBucketObjectLockSys.Get(bucket); err == nil {
				if ret.LockEnabled {
					lock, _, _, _, err := clnt.GetObjectLockConfig(ctx, arn.Bucket)
					if err != nil {
						return sameTarget, errorCodes.ToAPIErrWithErr(ErrReplicationDestinationMissingLock, err)
					}
					if lock != objectlock.Enabled {
						return sameTarget, errorCodes.ToAPIErrWithErr(ErrReplicationDestinationMissingLock, nil)
					}
				}
			}
		}
		// validate replication ARN against target endpoint
		c := globalBucketTargetSys.GetRemoteTargetClient(ctx, arnStr)
		if c != nil {
			if c.EndpointURL().String() == clnt.EndpointURL().String() {
				selfTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort)
				if !sameTarget {
					sameTarget = selfTarget
				}
				continue
			}
		}
	}

	if len(arns) == 0 {
		return false, toAPIError(ctx, BucketRemoteTargetNotFound{Bucket: bucket})
	}
	return sameTarget, toAPIError(ctx, nil)
}

type mustReplicateOptions struct {
	meta               map[string]string
	status             replication.StatusType
	opType             replication.Type
	replicationRequest bool // incoming request is a replication request
}

func (o mustReplicateOptions) ReplicationStatus() (s replication.StatusType) {
	if rs, ok := o.meta[xhttp.AmzBucketReplicationStatus]; ok {
		return replication.StatusType(rs)
	}
	return s
}

func (o mustReplicateOptions) isExistingObjectReplication() bool {
	return o.opType == replication.ExistingObjectReplicationType
}

func (o mustReplicateOptions) isMetadataReplication() bool {
	return o.opType == replication.MetadataReplicationType
}

func getMustReplicateOptions(o ObjectInfo, op replication.Type, opts ObjectOptions) mustReplicateOptions {
	if !op.Valid() {
		op = replication.ObjectReplicationType
		if o.metadataOnly {
			op = replication.MetadataReplicationType
		}
	}
	meta := cloneMSS(o.UserDefined)
	if o.UserTags != "" {
		meta[xhttp.AmzObjectTagging] = o.UserTags
	}

	return mustReplicateOptions{
		meta:               meta,
		status:             o.ReplicationStatus,
		opType:             op,
		replicationRequest: opts.ReplicationRequest,
	}
}

// mustReplicate returns 2 booleans - true if object meets replication criteria and true if replication is to be done in
// a synchronous manner.
func mustReplicate(ctx context.Context, bucket, object string, mopts mustReplicateOptions) (dsc ReplicateDecision) {
	// object layer not initialized we return with no decision.
	if newObjectLayerFn() == nil {
		return
	}

	// Disable server-side replication on object prefixes which are excluded
	// from versioning via the MinIO bucket versioning extension.
	if !globalBucketVersioningSys.PrefixEnabled(bucket, object) {
		return
	}

	replStatus := mopts.ReplicationStatus()
	if replStatus == replication.Replica && !mopts.isMetadataReplication() {
		return
	}

	if mopts.replicationRequest { // incoming replication request on target cluster
		return
	}
	cfg, err := getReplicationConfig(ctx, bucket)
	if err != nil {
		return
	}
	opts := replication.ObjectOpts{
		Name:           object,
		SSEC:           crypto.SSEC.IsEncrypted(mopts.meta),
		Replica:        replStatus == replication.Replica,
		ExistingObject: mopts.isExistingObjectReplication(),
	}
	tagStr, ok := mopts.meta[xhttp.AmzObjectTagging]
	if ok {
		opts.UserTags = tagStr
	}
	tgtArns := cfg.FilterTargetArns(opts)
	for _, tgtArn := range tgtArns {
		tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn)
		// the target online status should not be used here while deciding
		// whether to replicate as the target could be temporarily down
		opts.TargetArn = tgtArn
		replicate := cfg.Replicate(opts)
		var synchronous bool
		if tgt != nil {
			synchronous = tgt.replicateSync
		}
		dsc.Set(newReplicateTargetDecision(tgtArn, replicate, synchronous))
	}
	return dsc
}

// Standard headers that needs to be extracted from User metadata.
var standardHeaders = []string{
	xhttp.ContentType,
	xhttp.CacheControl,
	xhttp.ContentEncoding,
	xhttp.ContentLanguage,
	xhttp.ContentDisposition,
	xhttp.AmzStorageClass,
	xhttp.AmzObjectTagging,
	xhttp.AmzBucketReplicationStatus,
	xhttp.AmzObjectLockMode,
	xhttp.AmzObjectLockRetainUntilDate,
	xhttp.AmzObjectLockLegalHold,
	xhttp.AmzTagCount,
	xhttp.AmzServerSideEncryption,
}

// returns true if any of the objects being deleted qualifies for replication.
func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToDelete) bool {
	c, err := getReplicationConfig(ctx, bucket)
	if err != nil || c == nil {
		return false
	}
	for _, obj := range objects {
		if c.HasActiveRules(obj.ObjectName, true) {
			return true
		}
	}
	return false
}

// isStandardHeader returns true if header is a supported header and not a custom header
func isStandardHeader(matchHeaderKey string) bool {
	return equals(matchHeaderKey, standardHeaders...)
}

// returns whether object version is a deletemarker and if object qualifies for replication
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, delOpts ObjectOptions, gerr error) (dsc ReplicateDecision) {
	rcfg, err := getReplicationConfig(ctx, bucket)
	if err != nil || rcfg == nil {
		return
	}
	// If incoming request is a replication request, it does not need to be re-replicated.
	if delOpts.ReplicationRequest {
		return
	}
	// Skip replication if this object's prefix is excluded from being
	// versioned.
	if !delOpts.Versioned {
		return
	}
	opts := replication.ObjectOpts{
		Name:         dobj.ObjectName,
		SSEC:         crypto.SSEC.IsEncrypted(oi.UserDefined),
		UserTags:     oi.UserTags,
		DeleteMarker: oi.DeleteMarker,
		VersionID:    dobj.VersionID,
		OpType:       replication.DeleteReplicationType,
	}
	tgtArns := rcfg.FilterTargetArns(opts)
	if len(tgtArns) > 0 {
		dsc.targetsMap = make(map[string]replicateTargetDecision, len(tgtArns))
		var sync, replicate bool
		for _, tgtArn := range tgtArns {
			opts.TargetArn = tgtArn
			replicate = rcfg.Replicate(opts)
			// when incoming delete is removal of a delete marker(a.k.a versioned delete),
			// GetObjectInfo returns extra information even though it returns errFileNotFound
			if gerr != nil {
				validReplStatus := false
				switch oi.TargetReplicationStatus(tgtArn) {
				case replication.Pending, replication.Completed, replication.Failed:
					validReplStatus = true
				}
				if oi.DeleteMarker && (validReplStatus || replicate) {
					dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync))
					continue
				} else {
					// can be the case that other cluster is down and duplicate `mc rm --vid`
					// is issued - this still needs to be replicated back to the other target
					replicate = oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed
					dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync))
					continue
				}
			}
			tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn)
			// the target online status should not be used here while deciding
			// whether to replicate deletes as the target could be temporarily down
			tgtDsc := newReplicateTargetDecision(tgtArn, false, false)
			if tgt != nil {
				tgtDsc = newReplicateTargetDecision(tgtArn, replicate, tgt.replicateSync)
			}
			dsc.Set(tgtDsc)
		}
	}
	return dsc
}

// replicate deletes to the designated replication target if replication configuration
// has delete marker replication or delete replication (MinIO extension to allow deletes where version id
// is specified) enabled.
// Similar to bucket replication for PUT operation, soft delete (a.k.a setting delete marker) and
// permanent deletes (by specifying a version ID in the delete operation) have three states "Pending", "Complete"
// and "Failed" to mark the status of the replication of "DELETE" operation. All failed operations can
// then be retried by healing. In the case of permanent deletes, until the replication is completed on the
// target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently
// deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds
// on target.
func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer) {
	var replicationStatus replication.StatusType
	bucket := dobj.Bucket
	versionID := dobj.DeleteMarkerVersionID
	if versionID == "" {
		versionID = dobj.VersionID
	}

	defer func() {
		replStatus := string(replicationStatus)
		auditLogInternal(context.Background(), AuditLogOptions{
			Event:     dobj.EventType,
			APIName:   ReplicateDeleteAPI,
			Bucket:    bucket,
			Object:    dobj.ObjectName,
			VersionID: versionID,
			Status:    replStatus,
		})
	}()

	rcfg, err := getReplicationConfig(ctx, bucket)
	if err != nil || rcfg == nil {
		logger.LogIf(ctx, err)
		sendEvent(eventArgs{
			BucketName: bucket,
			Object: ObjectInfo{
				Bucket:       bucket,
				Name:         dobj.ObjectName,
				VersionID:    versionID,
				DeleteMarker: dobj.DeleteMarker,
			},
			Host:      "Internal: [Replication]",
			EventName: event.ObjectReplicationNotTracked,
		})
		return
	}
	dsc, err := parseReplicateDecision(dobj.ReplicationState.ReplicateDecisionStr)
	if err != nil {
		logger.LogIf(ctx, err)
		sendEvent(eventArgs{
			BucketName: bucket,
			Object: ObjectInfo{
				Bucket:       bucket,
				Name:         dobj.ObjectName,
				VersionID:    versionID,
				DeleteMarker: dobj.DeleteMarker,
			},
			Host:      "Internal: [Replication]",
			EventName: event.ObjectReplicationNotTracked,
		})
		return
	}

	// Lock the object name before starting replication operation.
	// Use separate lock that doesn't collide with regular objects.
	lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+dobj.ObjectName)
	lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
	if err != nil {
		globalReplicationPool.queueMRFSave(dobj.ToMRFEntry())
		logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", dobj.ObjectName, bucket, dobj.TargetArn))
		sendEvent(eventArgs{
			BucketName: bucket,
			Object: ObjectInfo{
				Bucket:       bucket,
				Name:         dobj.ObjectName,
				VersionID:    versionID,
				DeleteMarker: dobj.DeleteMarker,
			},
			Host:      "Internal: [Replication]",
			EventName: event.ObjectReplicationNotTracked,
		})
		return
	}
	ctx = lkctx.Context()
	defer lk.Unlock(lkctx)

	var wg sync.WaitGroup
	var rinfos replicatedInfos
	rinfos.Targets = make([]replicatedTargetInfo, len(dsc.targetsMap))
	idx := -1
	for tgtArn := range dsc.targetsMap {
		idx++
		tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn)
		if tgt == nil {
			logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn))
			sendEvent(eventArgs{
				BucketName: bucket,
				Object: ObjectInfo{
					Bucket:       bucket,
					Name:         dobj.ObjectName,
					VersionID:    versionID,
					DeleteMarker: dobj.DeleteMarker,
				},
				Host:      "Internal: [Replication]",
				EventName: event.ObjectReplicationNotTracked,
			})
			continue
		}
		if tgt := dsc.targetsMap[tgtArn]; !tgt.Replicate {
			continue
		}
		// if dobj.TargetArn is not empty string, this is a case of specific target being re-synced.
		if dobj.TargetArn != "" && dobj.TargetArn != tgt.ARN {
			continue
		}
		wg.Add(1)
		go func(index int, tgt *TargetClient) {
			defer wg.Done()
			rinfo := replicateDeleteToTarget(ctx, dobj, tgt)
			rinfos.Targets[index] = rinfo
		}(idx, tgt)
	}
	wg.Wait()

	replicationStatus = rinfos.ReplicationStatus()
	prevStatus := dobj.DeleteMarkerReplicationStatus()

	if dobj.VersionID != "" {
		prevStatus = replication.StatusType(dobj.VersionPurgeStatus())
		replicationStatus = replication.StatusType(rinfos.VersionPurgeStatus())
	}

	// to decrement pending count later.
	for _, rinfo := range rinfos.Targets {
		if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus {
			globalReplicationStats.Update(dobj.Bucket, rinfo.Arn, 0, 0, replicationStatus,
				prevStatus, replication.DeleteReplicationType)
		}
	}

	eventName := event.ObjectReplicationComplete
	if replicationStatus == replication.Failed {
		eventName = event.ObjectReplicationFailed
		globalReplicationPool.queueMRFSave(dobj.ToMRFEntry())
	}
	drs := getReplicationState(rinfos, dobj.ReplicationState, dobj.VersionID)
	if replicationStatus != prevStatus {
		drs.ReplicationTimeStamp = UTCNow()
	}

	dobjInfo, err := objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{
		VersionID:         versionID,
		MTime:             dobj.DeleteMarkerMTime.Time,
		DeleteReplication: drs,
		Versioned:         globalBucketVersioningSys.PrefixEnabled(bucket, dobj.ObjectName),
		// Objects matching prefixes should not leave delete markers,
		// dramatically reduces namespace pollution while keeping the
		// benefits of replication, make sure to apply version suspension
		// only at bucket level instead.
		VersionSuspended: globalBucketVersioningSys.Suspended(bucket),
	})
	if err != nil && !isErrVersionNotFound(err) { // VersionNotFound would be reported by pool that object version is missing on.
		logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %s", bucket, dobj.ObjectName, versionID, err))
		sendEvent(eventArgs{
			BucketName: bucket,
			Object: ObjectInfo{
				Bucket:       bucket,
				Name:         dobj.ObjectName,
				VersionID:    versionID,
				DeleteMarker: dobj.DeleteMarker,
			},
			Host:      "Internal: [Replication]",
			EventName: eventName,
		})
	} else {
		sendEvent(eventArgs{
			BucketName: bucket,
			Object:     dobjInfo,
			Host:       "Internal: [Replication]",
			EventName:  eventName,
		})
	}
}

func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationInfo, tgt *TargetClient) (rinfo replicatedTargetInfo) {
	versionID := dobj.DeleteMarkerVersionID
	if versionID == "" {
		versionID = dobj.VersionID
	}

	rinfo = dobj.ReplicationState.targetState(tgt.ARN)
	rinfo.OpType = dobj.OpType
	defer func() {
		if rinfo.ReplicationStatus == replication.Completed && tgt.ResetID != "" && dobj.OpType == replication.ExistingObjectReplicationType {
			rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
		}
	}()

	if dobj.VersionID == "" && rinfo.PrevReplicationStatus == replication.Completed && dobj.OpType != replication.ExistingObjectReplicationType {
		rinfo.ReplicationStatus = rinfo.PrevReplicationStatus
		return
	}
	if dobj.VersionID != "" && rinfo.VersionPurgeStatus == Complete {
		return
	}
	if globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
		logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", dobj.Bucket, tgt.ARN))
		sendEvent(eventArgs{
			BucketName: dobj.Bucket,
			Object: ObjectInfo{
				Bucket:       dobj.Bucket,
				Name:         dobj.ObjectName,
				VersionID:    dobj.VersionID,
				DeleteMarker: dobj.DeleteMarker,
			},
			Host:      "Internal: [Replication]",
			EventName: event.ObjectReplicationNotTracked,
		})
		if dobj.VersionID == "" {
			rinfo.ReplicationStatus = replication.Failed
		} else {
			rinfo.VersionPurgeStatus = Failed
		}
		return
	}
	// early return if already replicated delete marker for existing object replication/ healing delete markers
	if dobj.DeleteMarkerVersionID != "" {
		toi, err := tgt.StatObject(ctx, tgt.Bucket, dobj.ObjectName, minio.StatObjectOptions{
			VersionID: versionID,
			Internal: minio.AdvancedGetOptions{
				ReplicationProxyRequest:           "false",
				IsReplicationReadyForDeleteMarker: true,
			},
		})
		if isErrMethodNotAllowed(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)) {
			if dobj.VersionID == "" {
				rinfo.ReplicationStatus = replication.Completed
				return
			}
		}
		if !isErrObjectNotFound(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)) {
			// mark delete marker replication as failed if target cluster not ready to receive
			// this request yet (object version not replicated yet)
			if err != nil && !toi.ReplicationReady {
				rinfo.ReplicationStatus = replication.Failed
				return
			}
		}
	}

	rmErr := tgt.RemoveObject(ctx, tgt.Bucket, dobj.ObjectName, minio.RemoveObjectOptions{
		VersionID: versionID,
		Internal: minio.AdvancedRemoveOptions{
			ReplicationDeleteMarker: dobj.DeleteMarkerVersionID != "",
			ReplicationMTime:        dobj.DeleteMarkerMTime.Time,
			ReplicationStatus:       minio.ReplicationStatusReplica,
			ReplicationRequest:      true, // always set this to distinguish between `mc mirror` replication and serverside
		},
	})
	if rmErr != nil {
		if dobj.VersionID == "" {
			rinfo.ReplicationStatus = replication.Failed
		} else {
			rinfo.VersionPurgeStatus = Failed
		}
		logger.LogIf(ctx, fmt.Errorf("Unable to replicate delete marker to %s/%s(%s): %s", tgt.Bucket, dobj.ObjectName, versionID, rmErr))
	} else {
		if dobj.VersionID == "" {
			rinfo.ReplicationStatus = replication.Completed
		} else {
			rinfo.VersionPurgeStatus = Complete
		}
	}
	return
}

func getCopyObjMetadata(oi ObjectInfo, sc string) map[string]string {
	meta := make(map[string]string, len(oi.UserDefined))
	for k, v := range oi.UserDefined {
		if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
			continue
		}

		if equals(k, xhttp.AmzBucketReplicationStatus) {
			continue
		}

		// https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w
		if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) {
			continue
		}
		meta[k] = v
	}

	if oi.ContentEncoding != "" {
		meta[xhttp.ContentEncoding] = oi.ContentEncoding
	}

	if oi.ContentType != "" {
		meta[xhttp.ContentType] = oi.ContentType
	}

	if oi.UserTags != "" {
		meta[xhttp.AmzObjectTagging] = oi.UserTags
		meta[xhttp.AmzTagDirective] = "REPLACE"
	}

	if sc == "" {
		sc = oi.StorageClass
	}
	// drop non standard storage classes for tiering from replication
	if sc != "" && (sc == storageclass.RRS || sc == storageclass.STANDARD) {
		meta[xhttp.AmzStorageClass] = sc
	}

	meta[xhttp.MinIOSourceETag] = oi.ETag
	meta[xhttp.MinIOSourceMTime] = oi.ModTime.UTC().Format(time.RFC3339Nano)
	meta[xhttp.AmzBucketReplicationStatus] = replication.Replica.String()
	return meta
}

type caseInsensitiveMap map[string]string

// Lookup map entry case insensitively.
func (m caseInsensitiveMap) Lookup(key string) (string, bool) {
	if len(m) == 0 {
		return "", false
	}
	for _, k := range []string{
		key,
		strings.ToLower(key),
		http.CanonicalHeaderKey(key),
	} {
		v, ok := m[k]
		if ok {
			return v, ok
		}
	}
	return "", false
}

func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts minio.PutObjectOptions, err error) {
	meta := make(map[string]string)
	for k, v := range objInfo.UserDefined {
		if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
			continue
		}
		if isStandardHeader(k) {
			continue
		}
		meta[k] = v
	}

	if sc == "" && (objInfo.StorageClass == storageclass.STANDARD || objInfo.StorageClass == storageclass.RRS) {
		sc = objInfo.StorageClass
	}
	putOpts = minio.PutObjectOptions{
		UserMetadata:    meta,
		ContentType:     objInfo.ContentType,
		ContentEncoding: objInfo.ContentEncoding,
		StorageClass:    sc,
		Internal: minio.AdvancedPutOptions{
			SourceVersionID:    objInfo.VersionID,
			ReplicationStatus:  minio.ReplicationStatusReplica,
			SourceMTime:        objInfo.ModTime,
			SourceETag:         objInfo.ETag,
			ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
		},
	}
	if objInfo.UserTags != "" {
		tag, _ := tags.ParseObjectTags(objInfo.UserTags)
		if tag != nil {
			putOpts.UserTags = tag.ToMap()
			// set tag timestamp in opts
			tagTimestamp := objInfo.ModTime
			if tagTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp]; ok {
				tagTimestamp, err = time.Parse(time.RFC3339Nano, tagTmstampStr)
				if err != nil {
					return putOpts, err
				}
			}
			putOpts.Internal.TaggingTimestamp = tagTimestamp
		}
	}

	lkMap := caseInsensitiveMap(objInfo.UserDefined)
	if lang, ok := lkMap.Lookup(xhttp.ContentLanguage); ok {
		putOpts.ContentLanguage = lang
	}
	if disp, ok := lkMap.Lookup(xhttp.ContentDisposition); ok {
		putOpts.ContentDisposition = disp
	}
	if cc, ok := lkMap.Lookup(xhttp.CacheControl); ok {
		putOpts.CacheControl = cc
	}
	if mode, ok := lkMap.Lookup(xhttp.AmzObjectLockMode); ok {
		rmode := minio.RetentionMode(mode)
		putOpts.Mode = rmode
	}
	if retainDateStr, ok := lkMap.Lookup(xhttp.AmzObjectLockRetainUntilDate); ok {
		rdate, err := amztime.ISO8601Parse(retainDateStr)
		if err != nil {
			return putOpts, err
		}
		putOpts.RetainUntilDate = rdate
		// set retention timestamp in opts
		retTimestamp := objInfo.ModTime
		if retainTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp]; ok {
			retTimestamp, err = time.Parse(time.RFC3339Nano, retainTmstampStr)
			if err != nil {
				return putOpts, err
			}
		}
		putOpts.Internal.RetentionTimestamp = retTimestamp
	}
	if lhold, ok := lkMap.Lookup(xhttp.AmzObjectLockLegalHold); ok {
		putOpts.LegalHold = minio.LegalHoldStatus(lhold)
		// set legalhold timestamp in opts
		lholdTimestamp := objInfo.ModTime
		if lholdTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp]; ok {
			lholdTimestamp, err = time.Parse(time.RFC3339Nano, lholdTmstampStr)
			if err != nil {
				return putOpts, err
			}
		}
		putOpts.Internal.LegalholdTimestamp = lholdTimestamp
	}
	if crypto.S3.IsEncrypted(objInfo.UserDefined) {
		putOpts.ServerSideEncryption = encrypt.NewSSE()
	}
	return
}

type replicationAction string

const (
	replicateMetadata replicationAction = "metadata"
	replicateNone     replicationAction = "none"
	replicateAll      replicationAction = "all"
)

// matches k1 with all keys, returns 'true' if one of them matches
func equals(k1 string, keys ...string) bool {
	for _, k2 := range keys {
		if strings.EqualFold(k1, k2) {
			return true
		}
	}
	return false
}

// returns replicationAction by comparing metadata between source and target
func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo, opType replication.Type) replicationAction {
	// Avoid resyncing null versions created prior to enabling replication if target has a newer copy
	if opType == replication.ExistingObjectReplicationType &&
		oi1.ModTime.Unix() > oi2.LastModified.Unix() && oi1.VersionID == nullVersionID {
		return replicateNone
	}
	sz, _ := oi1.GetActualSize()

	// needs full replication
	if oi1.ETag != oi2.ETag ||
		oi1.VersionID != oi2.VersionID ||
		sz != oi2.Size ||
		oi1.DeleteMarker != oi2.IsDeleteMarker ||
		oi1.ModTime.Unix() != oi2.LastModified.Unix() {
		return replicateAll
	}

	if oi1.ContentType != oi2.ContentType {
		return replicateMetadata
	}

	if oi1.ContentEncoding != "" {
		enc, ok := oi2.Metadata[xhttp.ContentEncoding]
		if !ok {
			enc, ok = oi2.Metadata[strings.ToLower(xhttp.ContentEncoding)]
			if !ok {
				return replicateMetadata
			}
		}
		if strings.Join(enc, ",") != oi1.ContentEncoding {
			return replicateMetadata
		}
	}

	t, _ := tags.ParseObjectTags(oi1.UserTags)
	if !reflect.DeepEqual(oi2.UserTags, t.ToMap()) || (oi2.UserTagCount != len(t.ToMap())) {
		return replicateMetadata
	}

	// Compare only necessary headers
	compareKeys := []string{
		"Expires",
		"Cache-Control",
		"Content-Language",
		"Content-Disposition",
		"X-Amz-Object-Lock-Mode",
		"X-Amz-Object-Lock-Retain-Until-Date",
		"X-Amz-Object-Lock-Legal-Hold",
		"X-Amz-Website-Redirect-Location",
		"X-Amz-Meta-",
	}

	// compare metadata on both maps to see if meta is identical
	compareMeta1 := make(map[string]string)
	for k, v := range oi1.UserDefined {
		var found bool
		for _, prefix := range compareKeys {
			if !strings.HasPrefix(strings.ToLower(k), strings.ToLower(prefix)) {
				continue
			}
			found = true
			break
		}
		if found {
			compareMeta1[strings.ToLower(k)] = v
		}
	}

	compareMeta2 := make(map[string]string)
	for k, v := range oi2.Metadata {
		var found bool
		for _, prefix := range compareKeys {
			if !strings.HasPrefix(strings.ToLower(k), strings.ToLower(prefix)) {
				continue
			}
			found = true
			break
		}
		if found {
			compareMeta2[strings.ToLower(k)] = strings.Join(v, ",")
		}
	}

	if !reflect.DeepEqual(compareMeta1, compareMeta2) {
		return replicateMetadata
	}

	return replicateNone
}

// replicateObject replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer) {
	var replicationStatus replication.StatusType
	defer func() {
		if replicationStatus.Empty() {
			// replication status is empty means
			// replication was not attempted for some
			// reason, notify the state of the object
			// on disk.
			replicationStatus = ri.ReplicationStatus
		}
		auditLogInternal(ctx, AuditLogOptions{
			Event:     ri.EventType,
			APIName:   ReplicateObjectAPI,
			Bucket:    ri.Bucket,
			Object:    ri.Name,
			VersionID: ri.VersionID,
			Status:    replicationStatus.String(),
		})
	}()

	objInfo := ri.ObjectInfo
	bucket := objInfo.Bucket
	object := objInfo.Name

	cfg, err := getReplicationConfig(ctx, bucket)
	if err != nil {
		logger.LogIf(ctx, err)
		sendEvent(eventArgs{
			EventName:  event.ObjectReplicationNotTracked,
			BucketName: bucket,
			Object:     objInfo,
			Host:       "Internal: [Replication]",
		})
		return
	}
	tgtArns := cfg.FilterTargetArns(replication.ObjectOpts{
		Name:     object,
		SSEC:     crypto.SSEC.IsEncrypted(objInfo.UserDefined),
		UserTags: objInfo.UserTags,
	})
	// Lock the object name before starting replication.
	// Use separate lock that doesn't collide with regular objects.
	lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+object)
	lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
	if err != nil {
		sendEvent(eventArgs{
			EventName:  event.ObjectReplicationNotTracked,
			BucketName: bucket,
			Object:     objInfo,
			Host:       "Internal: [Replication]",
		})
		globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
		logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", object, bucket, ri.TargetArn))
		return
	}
	ctx = lkctx.Context()
	defer lk.Unlock(lkctx)

	var wg sync.WaitGroup
	var rinfos replicatedInfos
	rinfos.Targets = make([]replicatedTargetInfo, len(tgtArns))
	for i, tgtArn := range tgtArns {
		tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn)
		if tgt == nil {
			logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn))
			sendEvent(eventArgs{
				EventName:  event.ObjectReplicationNotTracked,
				BucketName: bucket,
				Object:     objInfo,
				Host:       "Internal: [Replication]",
			})
			continue
		}
		wg.Add(1)
		go func(index int, tgt *TargetClient) {
			defer wg.Done()
			if ri.OpType == replication.ObjectReplicationType {
				// all incoming calls go through optimized path.
				rinfos.Targets[index] = ri.replicateObject(ctx, objectAPI, tgt)
			} else {
				rinfos.Targets[index] = ri.replicateAll(ctx, objectAPI, tgt)
			}
		}(i, tgt)
	}
	wg.Wait()
	// FIXME: add support for missing replication events
	// - event.ObjectReplicationMissedThreshold
	// - event.ObjectReplicationReplicatedAfterThreshold
	eventName := event.ObjectReplicationComplete
	if rinfos.ReplicationStatus() == replication.Failed {
		eventName = event.ObjectReplicationFailed
	}
	newReplStatusInternal := rinfos.ReplicationStatusInternal()
	// Note that internal replication status(es) may match for previously replicated objects - in such cases
	// metadata should be updated with last resync timestamp.
	if objInfo.ReplicationStatusInternal != newReplStatusInternal || rinfos.ReplicationResynced() {
		popts := ObjectOptions{
			MTime:     objInfo.ModTime,
			VersionID: objInfo.VersionID,
			EvalMetadataFn: func(oi *ObjectInfo) error {
				oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = newReplStatusInternal
				oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
				oi.UserDefined[xhttp.AmzBucketReplicationStatus] = string(rinfos.ReplicationStatus())
				for _, rinfo := range rinfos.Targets {
					if rinfo.ResyncTimestamp != "" {
						oi.UserDefined[targetResetHeader(rinfo.Arn)] = rinfo.ResyncTimestamp
					}
				}
				if objInfo.UserTags != "" {
					oi.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
				}
				return nil
			},
		}

		if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil {
			logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w",
				bucket, objInfo.Name, objInfo.VersionID, err))
		}
		opType := replication.MetadataReplicationType
		if rinfos.Action() == replicateAll {
			opType = replication.ObjectReplicationType
		}
		for _, rinfo := range rinfos.Targets {
			if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus {
				globalReplicationStats.Update(bucket, rinfo.Arn, rinfo.Size, rinfo.Duration, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus, opType)
			}
		}
	}

	sendEvent(eventArgs{
		EventName:  eventName,
		BucketName: bucket,
		Object:     objInfo,
		Host:       "Internal: [Replication]",
	})

	// re-queue failures once more - keep a retry count to avoid flooding the queue if
	// the target site is down. Leave it to scanner to catch up instead.
	if rinfos.ReplicationStatus() != replication.Completed {
		ri.OpType = replication.HealReplicationType
		ri.EventType = ReplicateMRF
		ri.ReplicationStatusInternal = rinfos.ReplicationStatusInternal()
		ri.RetryCount++
		globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
	}
}

// replicateObject replicates object data for specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
	startTime := time.Now()
	objInfo := ri.ObjectInfo.Clone()
	bucket := objInfo.Bucket
	object := objInfo.Name
	sz, _ := objInfo.GetActualSize()

	rAction := replicateAll
	rinfo = replicatedTargetInfo{
		Size:                  sz,
		Arn:                   tgt.ARN,
		PrevReplicationStatus: objInfo.TargetReplicationStatus(tgt.ARN),
		ReplicationStatus:     replication.Failed,
		OpType:                ri.OpType,
		ReplicationAction:     rAction,
	}

	if ri.ObjectInfo.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) {
		rinfo.ReplicationStatus = replication.Completed
		rinfo.ReplicationResynced = true
		return
	}

	if globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
		logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, tgt.ARN))
		sendEvent(eventArgs{
			EventName:  event.ObjectReplicationNotTracked,
			BucketName: bucket,
			Object:     objInfo,
			Host:       "Internal: [Replication]",
		})
		return
	}

	versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
	versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)

	gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
		VersionID:        objInfo.VersionID,
		Versioned:        versioned,
		VersionSuspended: versionSuspended,
	})
	if err != nil {
		if !isErrVersionNotFound(err) && !isErrObjectNotFound(err) {
			sendEvent(eventArgs{
				EventName:  event.ObjectReplicationNotTracked,
				BucketName: bucket,
				Object:     objInfo,
				Host:       "Internal: [Replication]",
			})
			logger.LogIf(ctx, fmt.Errorf("Unable to update replicate metadata for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err))
		}
		return
	}
	defer gr.Close()

	objInfo = gr.ObjInfo

	size, err := objInfo.GetActualSize()
	if err != nil {
		logger.LogIf(ctx, err)
		sendEvent(eventArgs{
			EventName:  event.ObjectReplicationNotTracked,
			BucketName: bucket,
			Object:     objInfo,
			Host:       "Internal: [Replication]",
		})
		return
	}

	if tgt.Bucket == "" {
		logger.LogIf(ctx, fmt.Errorf("Unable to replicate object %s(%s), bucket is empty", objInfo.Name, objInfo.VersionID))
		sendEvent(eventArgs{
			EventName:  event.ObjectReplicationNotTracked,
			BucketName: bucket,
			Object:     objInfo,
			Host:       "Internal: [Replication]",
		})
		return rinfo
	}
	defer func() {
		if rinfo.ReplicationStatus == replication.Completed && ri.OpType == replication.ExistingObjectReplicationType && tgt.ResetID != "" {
			rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
			rinfo.ReplicationResynced = true
		}
		rinfo.Duration = time.Since(startTime)
	}()

	rinfo.ReplicationStatus = replication.Completed
	rinfo.Size = size
	rinfo.ReplicationAction = rAction
	// use core client to avoid doing multipart on PUT
	c := &minio.Core{Client: tgt.Client}

	putOpts, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo)
	if err != nil {
		logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s err:%w", bucket, err))
		sendEvent(eventArgs{
			EventName:  event.ObjectReplicationNotTracked,
			BucketName: bucket,
			Object:     objInfo,
			Host:       "Internal: [Replication]",
		})
		return
	}

	var headerSize int
	for k, v := range putOpts.Header() {
		headerSize += len(k) + len(v)
	}

	opts := &bandwidth.MonitorReaderOptions{
		Bucket:     objInfo.Bucket,
		TargetARN:  tgt.ARN,
		HeaderSize: headerSize,
	}
	newCtx := ctx
	if globalBucketMonitor.IsThrottled(bucket, tgt.ARN) {
		var cancel context.CancelFunc
		newCtx, cancel = context.WithTimeout(ctx, throttleDeadline)
		defer cancel()
	}
	r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
	if objInfo.isMultipart() {
		if err := replicateObjectWithMultipart(ctx, c, tgt.Bucket, object,
			r, objInfo, putOpts); err != nil {
			if minio.ToErrorResponse(err).Code != "PreconditionFailed" {
				rinfo.ReplicationStatus = replication.Failed
				logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
			}
		}
	} else {
		if _, err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts); err != nil {
			if minio.ToErrorResponse(err).Code != "PreconditionFailed" {
				rinfo.ReplicationStatus = replication.Failed
				logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
			}
		}
	}
	return
}

// replicateAll replicates metadata for specified version of the object to destination bucket
// if the destination version is missing it automatically does fully copy as well.
// The source object is then updated to reflect the replication status.
func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
	startTime := time.Now()
	objInfo := ri.ObjectInfo.Clone()
	bucket := objInfo.Bucket
	object := objInfo.Name
	sz, _ := objInfo.GetActualSize()

	// set defaults for replication action based on operation being performed - actual
	// replication action can only be determined after stat on remote. This default is
	// needed for updating replication metrics correctly when target is offline.
	rAction := replicateMetadata

	rinfo = replicatedTargetInfo{
		Size:                  sz,
		Arn:                   tgt.ARN,
		PrevReplicationStatus: objInfo.TargetReplicationStatus(tgt.ARN),
		ReplicationStatus:     replication.Failed,
		OpType:                ri.OpType,
		ReplicationAction:     rAction,
	}

	if globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
		logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, tgt.ARN))
		sendEvent(eventArgs{
			EventName:  event.ObjectReplicationNotTracked,
			BucketName: bucket,
			Object:     objInfo,
			Host:       "Internal: [Replication]",
		})
		return
	}

	versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
	versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)

	gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
		VersionID:        objInfo.VersionID,
		Versioned:        versioned,
		VersionSuspended: versionSuspended,
	})
	if err != nil {
		if !isErrVersionNotFound(err) && !isErrObjectNotFound(err) {
			sendEvent(eventArgs{
				EventName:  event.ObjectReplicationNotTracked,
				BucketName: bucket,
				Object:     objInfo,
				Host:       "Internal: [Replication]",
			})
			logger.LogIf(ctx, fmt.Errorf("Unable to update replicate metadata for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err))
		}
		return
	}
	defer gr.Close()

	objInfo = gr.ObjInfo

	// use latest ObjectInfo to check if previous replication attempt succeeded
	if objInfo.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) {
		rinfo.ReplicationStatus = replication.Completed
		rinfo.ReplicationResynced = true
		return
	}

	size, err := objInfo.GetActualSize()
	if err != nil {
		logger.LogIf(ctx, err)
		sendEvent(eventArgs{
			EventName:  event.ObjectReplicationNotTracked,
			BucketName: bucket,
			Object:     objInfo,
			Host:       "Internal: [Replication]",
		})
		return
	}

	if tgt.Bucket == "" {
		logger.LogIf(ctx, fmt.Errorf("unable to replicate object %s(%s), bucket is empty", objInfo.Name, objInfo.VersionID))
		sendEvent(eventArgs{
			EventName:  event.ObjectReplicationNotTracked,
			BucketName: bucket,
			Object:     objInfo,
			Host:       "Internal: [Replication]",
		})
		return rinfo
	}
	defer func() {
		if rinfo.ReplicationStatus == replication.Completed && ri.OpType == replication.ExistingObjectReplicationType && tgt.ResetID != "" {
			rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
			rinfo.ReplicationResynced = true
		}
		rinfo.Duration = time.Since(startTime)
	}()

	rAction = replicateAll
	oi, cerr := tgt.StatObject(ctx, tgt.Bucket, object, minio.StatObjectOptions{
		VersionID: objInfo.VersionID,
		Internal: minio.AdvancedGetOptions{
			ReplicationProxyRequest: "false",
		},
	})
	if cerr == nil {
		rAction = getReplicationAction(objInfo, oi, ri.OpType)
		rinfo.ReplicationStatus = replication.Completed
		if rAction == replicateNone {
			if ri.OpType == replication.ExistingObjectReplicationType &&
				objInfo.ModTime.Unix() > oi.LastModified.Unix() && objInfo.VersionID == nullVersionID {
				logger.LogIf(ctx, fmt.Errorf("unable to replicate %s/%s (null). Newer version exists on target", bucket, object))
				sendEvent(eventArgs{
					EventName:  event.ObjectReplicationNotTracked,
					BucketName: bucket,
					Object:     objInfo,
					Host:       "Internal: [Replication]",
				})
			}
			// object with same VersionID already exists, replication kicked off by
			// PutObject might have completed
			if objInfo.TargetReplicationStatus(tgt.ARN) == replication.Pending || objInfo.TargetReplicationStatus(tgt.ARN) == replication.Failed || ri.OpType == replication.ExistingObjectReplicationType {
				// if metadata is not updated for some reason after replication, such as
				// 503 encountered while updating metadata - make sure to set ReplicationStatus
				// as Completed.
				//
				// Note: Replication Stats would have been updated despite metadata update failure.
				rinfo.ReplicationAction = rAction
				rinfo.ReplicationStatus = replication.Completed
			}
			return
		}
	}
	rinfo.ReplicationStatus = replication.Completed
	rinfo.Size = size
	rinfo.ReplicationAction = rAction
	// use core client to avoid doing multipart on PUT
	c := &minio.Core{Client: tgt.Client}
	if rAction != replicateAll {
		// replicate metadata for object tagging/copy with metadata replacement
		srcOpts := minio.CopySrcOptions{
			Bucket:    tgt.Bucket,
			Object:    object,
			VersionID: objInfo.VersionID,
		}
		dstOpts := minio.PutObjectOptions{
			Internal: minio.AdvancedPutOptions{
				SourceVersionID:    objInfo.VersionID,
				ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
			},
		}
		if _, err = c.CopyObject(ctx, tgt.Bucket, object, tgt.Bucket, object, getCopyObjMetadata(objInfo, tgt.StorageClass), srcOpts, dstOpts); err != nil {
			rinfo.ReplicationStatus = replication.Failed
			logger.LogIf(ctx, fmt.Errorf("unable to replicate metadata for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
		}
	} else {
		var putOpts minio.PutObjectOptions
		putOpts, err = putReplicationOpts(ctx, tgt.StorageClass, objInfo)
		if err != nil {
			logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s err:%w", bucket, err))
			sendEvent(eventArgs{
				EventName:  event.ObjectReplicationNotTracked,
				BucketName: bucket,
				Object:     objInfo,
				Host:       "Internal: [Replication]",
			})
			return
		}
		var headerSize int
		for k, v := range putOpts.Header() {
			headerSize += len(k) + len(v)
		}

		opts := &bandwidth.MonitorReaderOptions{
			Bucket:     objInfo.Bucket,
			TargetARN:  tgt.ARN,
			HeaderSize: headerSize,
		}
		newCtx := ctx
		if globalBucketMonitor.IsThrottled(bucket, tgt.ARN) {
			var cancel context.CancelFunc
			newCtx, cancel = context.WithTimeout(ctx, throttleDeadline)
			defer cancel()
		}
		r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
		if objInfo.isMultipart() {
			if err := replicateObjectWithMultipart(ctx, c, tgt.Bucket, object,
				r, objInfo, putOpts); err != nil {
				if minio.ToErrorResponse(err).Code != "PreconditionFailed" {
					rinfo.ReplicationStatus = replication.Failed
					logger.LogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
				} else {
					rinfo.ReplicationStatus = replication.Completed
				}
			}
		} else {
			if _, err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts); err != nil {
				if minio.ToErrorResponse(err).Code != "PreconditionFailed" {
					rinfo.ReplicationStatus = replication.Failed
					logger.LogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
				} else {
					rinfo.ReplicationStatus = replication.Completed
				}
			}
		}
	}
	return
}

func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, object string, r io.Reader, objInfo ObjectInfo, opts minio.PutObjectOptions) (err error) {
	var uploadedParts []minio.CompletePart
	uploadID, err := c.NewMultipartUpload(context.Background(), bucket, object, opts)
	if err != nil {
		return err
	}

	defer func() {
		if err != nil {
			// block and abort remote upload upon failure.
			attempts := 1
			for attempts <= 3 {
				aerr := c.AbortMultipartUpload(ctx, bucket, object, uploadID)
				if aerr == nil {
					return
				}
				logger.LogIf(ctx,
					fmt.Errorf("Trying %s: Unable to cleanup failed multipart replication %s on remote %s/%s: %w - this may consume space on remote cluster",
						humanize.Ordinal(attempts), uploadID, bucket, object, aerr))
				attempts++
				time.Sleep(time.Second)
			}
		}
	}()

	var (
		hr    *hash.Reader
		pInfo minio.ObjectPart
	)

	for _, partInfo := range objInfo.Parts {
		hr, err = hash.NewReader(r, partInfo.ActualSize, "", "", partInfo.ActualSize)
		if err != nil {
			return err
		}
		pInfo, err = c.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, hr, partInfo.ActualSize, "", "", opts.ServerSideEncryption)
		if err != nil {
			return err
		}
		if pInfo.Size != partInfo.ActualSize {
			return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.ActualSize)
		}
		uploadedParts = append(uploadedParts, minio.CompletePart{
			PartNumber: pInfo.PartNumber,
			ETag:       pInfo.ETag,
		})
	}
	_, err = c.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, minio.PutObjectOptions{
		Internal: minio.AdvancedPutOptions{
			SourceMTime: objInfo.ModTime,
			// always set this to distinguish between `mc mirror` replication and serverside
			ReplicationRequest: true,
		},
	})
	return err
}

// filterReplicationStatusMetadata filters replication status metadata for COPY
func filterReplicationStatusMetadata(metadata map[string]string) map[string]string {
	// Copy on write
	dst := metadata
	var copied bool
	delKey := func(key string) {
		if _, ok := metadata[key]; !ok {
			return
		}
		if !copied {
			dst = make(map[string]string, len(metadata))
			for k, v := range metadata {
				dst[k] = v
			}
			copied = true
		}
		delete(dst, key)
	}

	delKey(xhttp.AmzBucketReplicationStatus)
	return dst
}

// DeletedObjectReplicationInfo has info on deleted object
type DeletedObjectReplicationInfo struct {
	DeletedObject
	Bucket    string
	EventType string
	OpType    replication.Type
	ResetID   string
	TargetArn string
}

// ToMRFEntry returns the relevant info needed by MRF
func (di DeletedObjectReplicationInfo) ToMRFEntry() MRFReplicateEntry {
	versionID := di.DeleteMarkerVersionID
	if versionID == "" {
		versionID = di.VersionID
	}
	return MRFReplicateEntry{
		Bucket:    di.Bucket,
		Object:    di.ObjectName,
		versionID: versionID,
	}
}

// Replication specific APIName
const (
	ReplicateObjectAPI = "ReplicateObject"
	ReplicateDeleteAPI = "ReplicateDelete"
)

const (
	// ReplicateQueued - replication being queued trail
	ReplicateQueued = "replicate:queue"

	// ReplicateExisting - audit trail for existing objects replication
	ReplicateExisting = "replicate:existing"
	// ReplicateExistingDelete - audit trail for delete replication triggered for existing delete markers
	ReplicateExistingDelete = "replicate:existing:delete"

	// ReplicateMRF - audit trail for replication from Most Recent Failures (MRF) queue
	ReplicateMRF = "replicate:mrf"
	// ReplicateIncoming - audit trail of inline replication
	ReplicateIncoming = "replicate:incoming"
	// ReplicateIncomingDelete - audit trail of inline replication of deletes.
	ReplicateIncomingDelete = "replicate:incoming:delete"

	// ReplicateHeal - audit trail for healing of failed/pending replications
	ReplicateHeal = "replicate:heal"
	// ReplicateHealDelete - audit trail of healing of failed/pending delete replications.
	ReplicateHealDelete = "replicate:heal:delete"
)

var (
	globalReplicationPool  *ReplicationPool
	globalReplicationStats *ReplicationStats
)

// ReplicationPool describes replication pool
type ReplicationPool struct {
	// atomic ops:
	activeWorkers    int32
	activeMRFWorkers int32

	objLayer ObjectLayer
	ctx      context.Context
	priority string
	mu       sync.RWMutex
	resyncer *replicationResyncer

	// workers:
	workers         []chan ReplicationWorkerOperation
	existingWorkers chan ReplicationWorkerOperation

	// mrf:
	mrfWorkerKillCh chan struct{}
	mrfReplicaCh    chan ReplicationWorkerOperation
	mrfSaveCh       chan MRFReplicateEntry
	mrfStopCh       chan struct{}
	mrfWorkerSize   int
	saveStateCh     chan struct{}
}

// ReplicationWorkerOperation is a shared interface of replication operations.
type ReplicationWorkerOperation interface {
	ToMRFEntry() MRFReplicateEntry
}

const (
	// WorkerMaxLimit max number of workers per node for "fast" mode
	WorkerMaxLimit = 500

	// WorkerMinLimit min number of workers per node for "slow" mode
	WorkerMinLimit = 50

	// WorkerAutoDefault is default number of workers for "auto" mode
	WorkerAutoDefault = 100

	// MRFWorkerMaxLimit max number of mrf workers per node for "fast" mode
	MRFWorkerMaxLimit = 8

	// MRFWorkerMinLimit min number of mrf workers per node for "slow" mode
	MRFWorkerMinLimit = 2

	// MRFWorkerAutoDefault is default number of mrf workers for "auto" mode
	MRFWorkerAutoDefault = 4
)

// NewReplicationPool creates a pool of replication workers of specified size
func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool {
	var workers, failedWorkers int
	priority := "auto"
	if opts.Priority != "" {
		priority = opts.Priority
	}
	switch priority {
	case "fast":
		workers = WorkerMaxLimit
		failedWorkers = MRFWorkerMaxLimit
	case "slow":
		workers = WorkerMinLimit
		failedWorkers = MRFWorkerMinLimit
	default:
		workers = WorkerAutoDefault
		failedWorkers = MRFWorkerAutoDefault
	}

	pool := &ReplicationPool{
		workers:         make([]chan ReplicationWorkerOperation, 0, workers),
		existingWorkers: make(chan ReplicationWorkerOperation, 100000),
		mrfReplicaCh:    make(chan ReplicationWorkerOperation, 100000),
		mrfWorkerKillCh: make(chan struct{}, failedWorkers),
		resyncer:        newresyncer(),
		mrfSaveCh:       make(chan MRFReplicateEntry, 100000),
		mrfStopCh:       make(chan struct{}, 1),
		saveStateCh:     make(chan struct{}, 1),
		ctx:             ctx,
		objLayer:        o,
		priority:        priority,
	}

	pool.ResizeWorkers(workers, 0)
	pool.ResizeFailedWorkers(failedWorkers)
	go pool.AddWorker(pool.existingWorkers, nil)
	go pool.resyncer.PersistToDisk(ctx, o)
	go pool.processMRF()
	go pool.persistMRF()
	go pool.saveStatsToDisk()
	return pool
}

// AddMRFWorker adds a pending/failed replication worker to handle requests that could not be queued
// to the other workers
func (p *ReplicationPool) AddMRFWorker() {
	for {
		select {
		case <-p.ctx.Done():
			return
		case oi, ok := <-p.mrfReplicaCh:
			if !ok {
				return
			}
			switch v := oi.(type) {
			case ReplicateObjectInfo:
				atomic.AddInt32(&p.activeMRFWorkers, 1)
				replicateObject(p.ctx, v, p.objLayer)
				atomic.AddInt32(&p.activeMRFWorkers, -1)
			default:
				logger.LogOnceIf(p.ctx, fmt.Errorf("unknown mrf replication type: %T", oi), "unknown-mrf-replicate-type")
			}
		case <-p.mrfWorkerKillCh:
			return
		}
	}
}

// AddWorker adds a replication worker to the pool.
// An optional pointer to a tracker that will be atomically
// incremented when operations are running can be provided.
func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opTracker *int32) {
	for {
		select {
		case <-p.ctx.Done():
			return
		case oi, ok := <-input:
			if !ok {
				return
			}
			switch v := oi.(type) {
			case ReplicateObjectInfo:
				if opTracker != nil {
					atomic.AddInt32(opTracker, 1)
				}
				replicateObject(p.ctx, v, p.objLayer)
				if opTracker != nil {
					atomic.AddInt32(opTracker, -1)
				}
			case DeletedObjectReplicationInfo:
				if opTracker != nil {
					atomic.AddInt32(opTracker, 1)
				}
				replicateDelete(p.ctx, v, p.objLayer)
				if opTracker != nil {
					atomic.AddInt32(opTracker, -1)
				}
			default:
				logger.LogOnceIf(p.ctx, fmt.Errorf("unknown replication type: %T", oi), "unknown-replicate-type")
			}
		}
	}
}

// ActiveWorkers returns the number of active workers handling replication traffic.
func (p *ReplicationPool) ActiveWorkers() int {
	return int(atomic.LoadInt32(&p.activeWorkers))
}

// ActiveMRFWorkers returns the number of active workers handling replication failures.
func (p *ReplicationPool) ActiveMRFWorkers() int {
	return int(atomic.LoadInt32(&p.activeMRFWorkers))
}

// ResizeWorkers sets replication workers pool to new size.
// checkOld can be set to an expected value.
// If the worker count changed
func (p *ReplicationPool) ResizeWorkers(n, checkOld int) {
	p.mu.Lock()
	defer p.mu.Unlock()

	if (checkOld > 0 && len(p.workers) != checkOld) || n == len(p.workers) || n < 1 {
		// Either already satisfied or worker count changed while we waited for the lock.
		return
	}
	for len(p.workers) < n {
		input := make(chan ReplicationWorkerOperation, 10000)
		p.workers = append(p.workers, input)

		go p.AddWorker(input, &p.activeWorkers)
	}
	for len(p.workers) > n {
		worker := p.workers[len(p.workers)-1]
		p.workers = p.workers[:len(p.workers)-1]
		close(worker)
	}
}

// ResizeWorkerPriority sets replication failed workers pool size
func (p *ReplicationPool) ResizeWorkerPriority(pri string) {
	var workers, mrfWorkers int
	p.mu.Lock()
	switch pri {
	case "fast":
		workers = WorkerMaxLimit
		mrfWorkers = MRFWorkerMaxLimit
	case "slow":
		workers = WorkerMinLimit
		mrfWorkers = MRFWorkerMinLimit
	default:
		workers = WorkerAutoDefault
		mrfWorkers = MRFWorkerAutoDefault
		if len(p.workers) < WorkerAutoDefault {
			workers = int(math.Min(float64(len(p.workers)+1), WorkerAutoDefault))
		}
		if p.mrfWorkerSize < MRFWorkerAutoDefault {
			mrfWorkers = int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerAutoDefault))
		}
	}
	p.priority = pri
	p.mu.Unlock()
	p.ResizeWorkers(workers, 0)
	p.ResizeFailedWorkers(mrfWorkers)
}

// ResizeFailedWorkers sets replication failed workers pool size
func (p *ReplicationPool) ResizeFailedWorkers(n int) {
	p.mu.Lock()
	defer p.mu.Unlock()

	for p.mrfWorkerSize < n {
		p.mrfWorkerSize++
		go p.AddMRFWorker()
	}
	for p.mrfWorkerSize > n {
		p.mrfWorkerSize--
		go func() { p.mrfWorkerKillCh <- struct{}{} }()
	}
}

// getWorkerCh gets a worker channel deterministically based on bucket and object names.
// Must be able to grab read lock from p.
func (p *ReplicationPool) getWorkerCh(bucket, object string) chan<- ReplicationWorkerOperation {
	h := xxh3.HashString(bucket + object)
	p.mu.RLock()
	defer p.mu.RUnlock()
	if len(p.workers) == 0 {
		return nil
	}
	return p.workers[h%uint64(len(p.workers))]
}

func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
	if p == nil {
		return
	}
	var ch, healCh chan<- ReplicationWorkerOperation
	switch ri.OpType {
	case replication.ExistingObjectReplicationType:
		ch = p.existingWorkers
	case replication.HealReplicationType:
		ch = p.mrfReplicaCh
		healCh = p.getWorkerCh(ri.Name, ri.Bucket)
	default:
		ch = p.getWorkerCh(ri.Name, ri.Bucket)
	}
	if ch == nil && healCh == nil {
		return
	}

	select {
	case <-p.ctx.Done():
	case healCh <- ri:
	case ch <- ri:
	default:
		globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
		p.mu.RLock()
		prio := p.priority
		p.mu.RUnlock()
		switch prio {
		case "fast":
			logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic"), string(replicationSubsystem))
		case "slow":
			logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem))
		default:
			if p.ActiveWorkers() < WorkerMaxLimit {
				p.mu.RLock()
				workers := int(math.Min(float64(len(p.workers)+1), WorkerMaxLimit))
				existing := len(p.workers)
				p.mu.RUnlock()
				p.ResizeWorkers(workers, existing)
			}
			if p.ActiveMRFWorkers() < MRFWorkerMaxLimit {
				p.mu.RLock()
				workers := int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerMaxLimit))
				p.mu.RUnlock()
				p.ResizeFailedWorkers(workers)
			}
		}
	}
}

func queueReplicateDeletesWrapper(doi DeletedObjectReplicationInfo, existingObjectResync ResyncDecision) {
	for k, v := range existingObjectResync.targets {
		if v.Replicate {
			doi.ResetID = v.ResetID
			doi.TargetArn = k

			globalReplicationPool.queueReplicaDeleteTask(doi)
		}
	}
}

func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInfo) {
	if p == nil {
		return
	}
	var ch chan<- ReplicationWorkerOperation
	switch doi.OpType {
	case replication.ExistingObjectReplicationType:
		ch = p.existingWorkers
	case replication.HealReplicationType:
		fallthrough
	default:
		ch = p.getWorkerCh(doi.Bucket, doi.ObjectName)
	}

	select {
	case <-p.ctx.Done():
	case ch <- doi:
	default:
		globalReplicationPool.queueMRFSave(doi.ToMRFEntry())
		p.mu.RLock()
		prio := p.priority
		p.mu.RUnlock()
		switch prio {
		case "fast":
			logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes"), string(replicationSubsystem))
		case "slow":
			logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem))
		default:
			if p.ActiveWorkers() < WorkerMaxLimit {
				p.mu.RLock()
				workers := int(math.Min(float64(len(p.workers)+1), WorkerMaxLimit))
				existing := len(p.workers)
				p.mu.RUnlock()
				p.ResizeWorkers(workers, existing)
			}
		}
	}
}

type replicationPoolOpts struct {
	Priority string
}

func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
	globalReplicationPool = NewReplicationPool(ctx, objectAPI, replicationPoolOpts{
		Priority: globalAPIConfig.getReplicationPriority(),
	})
	globalReplicationStats = NewReplicationStats(ctx, objectAPI)
	go globalReplicationStats.loadInitialReplicationMetrics(ctx)
}

type proxyResult struct {
	Proxy bool
	Err   error
}

// get Reader from replication target if active-active replication is in place and
// this node returns a 404
func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, _ http.Header, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (gr *GetObjectReader, proxy proxyResult, err error) {
	tgt, oi, proxy := proxyHeadToRepTarget(ctx, bucket, object, rs, opts, proxyTargets)
	if !proxy.Proxy {
		return nil, proxy, nil
	}
	fn, _, _, err := NewGetObjectReader(nil, oi, opts)
	if err != nil {
		return nil, proxy, err
	}
	gopts := minio.GetObjectOptions{
		VersionID:            opts.VersionID,
		ServerSideEncryption: opts.ServerSideEncryption,
		Internal: minio.AdvancedGetOptions{
			ReplicationProxyRequest: "true",
		},
		PartNumber: opts.PartNumber,
	}
	// get correct offsets for encrypted object
	if rs != nil {
		h, err := rs.ToHeader()
		if err != nil {
			return nil, proxy, err
		}
		gopts.Set(xhttp.Range, h)
	}
	// Make sure to match ETag when proxying.
	if err = gopts.SetMatchETag(oi.ETag); err != nil {
		return nil, proxy, err
	}
	c := minio.Core{Client: tgt.Client}
	obj, _, h, err := c.GetObject(ctx, tgt.Bucket, object, gopts)
	if err != nil {
		return nil, proxy, err
	}
	closeReader := func() { obj.Close() }
	reader, err := fn(obj, h, closeReader)
	if err != nil {
		return nil, proxy, err
	}
	reader.ObjInfo = oi.Clone()
	if rs != nil {
		contentSize, err := parseSizeFromContentRange(h)
		if err != nil {
			return nil, proxy, err
		}
		reader.ObjInfo.Size = contentSize
	}

	return reader, proxyResult{Proxy: true}, nil
}

func getProxyTargets(ctx context.Context, bucket, object string, opts ObjectOptions) (tgts *madmin.BucketTargets) {
	if opts.VersionSuspended {
		return &madmin.BucketTargets{}
	}
	if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) {
		return &madmin.BucketTargets{}
	}
	cfg, err := getReplicationConfig(ctx, bucket)
	if err != nil || cfg == nil {
		return &madmin.BucketTargets{}
	}
	topts := replication.ObjectOpts{Name: object}
	tgtArns := cfg.FilterTargetArns(topts)
	tgts = &madmin.BucketTargets{Targets: make([]madmin.BucketTarget, len(tgtArns))}
	for i, tgtArn := range tgtArns {
		tgt := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, bucket, tgtArn)
		tgts.Targets[i] = tgt
	}

	return tgts
}

func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (tgt *TargetClient, oi ObjectInfo, proxy proxyResult) {
	// this option is set when active-active replication is in place between site A -> B,
	// and site B does not have the object yet.
	if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header
		return nil, oi, proxy
	}
	for _, t := range proxyTargets.Targets {
		tgt = globalBucketTargetSys.GetRemoteTargetClient(ctx, t.Arn)
		if tgt == nil || globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
			continue
		}
		// if proxying explicitly disabled on remote target
		if tgt.disableProxy {
			continue
		}

		gopts := minio.GetObjectOptions{
			VersionID:            opts.VersionID,
			ServerSideEncryption: opts.ServerSideEncryption,
			Internal: minio.AdvancedGetOptions{
				ReplicationProxyRequest: "true",
			},
			PartNumber: opts.PartNumber,
		}
		if rs != nil {
			h, err := rs.ToHeader()
			if err != nil {
				logger.LogIf(ctx, fmt.Errorf("Invalid range header for %s/%s(%s) - %w", bucket, object, opts.VersionID, err))
				continue
			}
			gopts.Set(xhttp.Range, h)
		}

		objInfo, err := tgt.StatObject(ctx, t.TargetBucket, object, gopts)
		if err != nil {
			if isErrInvalidRange(ErrorRespToObjectError(err, bucket, object)) {
				return nil, oi, proxyResult{Err: err}
			}
			continue
		}

		tags, _ := tags.MapToObjectTags(objInfo.UserTags)
		oi = ObjectInfo{
			Bucket:                    bucket,
			Name:                      object,
			ModTime:                   objInfo.LastModified,
			Size:                      objInfo.Size,
			ETag:                      objInfo.ETag,
			VersionID:                 objInfo.VersionID,
			IsLatest:                  objInfo.IsLatest,
			DeleteMarker:              objInfo.IsDeleteMarker,
			ContentType:               objInfo.ContentType,
			Expires:                   objInfo.Expires,
			StorageClass:              objInfo.StorageClass,
			ReplicationStatusInternal: objInfo.ReplicationStatus,
			UserTags:                  tags.String(),
		}
		oi.UserDefined = make(map[string]string, len(objInfo.Metadata))
		for k, v := range objInfo.Metadata {
			oi.UserDefined[k] = v[0]
		}
		ce, ok := oi.UserDefined[xhttp.ContentEncoding]
		if !ok {
			ce, ok = oi.UserDefined[strings.ToLower(xhttp.ContentEncoding)]
		}
		if ok {
			oi.ContentEncoding = ce
		}
		return tgt, oi, proxyResult{Proxy: true}
	}
	return nil, oi, proxy
}

// get object info from replication target if active-active replication is in place and
// this node returns a 404
func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (oi ObjectInfo, proxy proxyResult) {
	_, oi, proxy = proxyHeadToRepTarget(ctx, bucket, object, rs, opts, proxyTargets)
	return oi, proxy
}

func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, dsc ReplicateDecision, opType replication.Type) {
	ri := ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType, Dsc: dsc, EventType: ReplicateIncoming}
	if dsc.Synchronous() {
		replicateObject(ctx, ri, o)
	} else {
		globalReplicationPool.queueReplicaTask(ri)
	}
	if sz, err := objInfo.GetActualSize(); err == nil {
		for arn := range dsc.targetsMap {
			globalReplicationStats.Update(objInfo.Bucket, arn, sz, 0, objInfo.ReplicationStatus, replication.StatusType(""), opType)
		}
	}
}

func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer) {
	globalReplicationPool.queueReplicaDeleteTask(dv)
	for arn := range dv.ReplicationState.Targets {
		globalReplicationStats.Update(dv.Bucket, arn, 0, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
	}
	for arn := range dv.ReplicationState.PurgeTargets {
		globalReplicationStats.Update(dv.Bucket, arn, 0, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
	}
}

type replicationConfig struct {
	Config  *replication.Config
	remotes *madmin.BucketTargets
}

func (c replicationConfig) Empty() bool {
	return c.Config == nil
}

func (c replicationConfig) Replicate(opts replication.ObjectOpts) bool {
	return c.Config.Replicate(opts)
}

// Resync returns true if replication reset is requested
func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc *ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) {
	if c.Empty() {
		return
	}

	// Now overlay existing object replication choices for target
	if oi.DeleteMarker {
		opts := replication.ObjectOpts{
			Name:           oi.Name,
			SSEC:           crypto.SSEC.IsEncrypted(oi.UserDefined),
			UserTags:       oi.UserTags,
			DeleteMarker:   oi.DeleteMarker,
			VersionID:      oi.VersionID,
			OpType:         replication.DeleteReplicationType,
			ExistingObject: true,
		}

		tgtArns := c.Config.FilterTargetArns(opts)
		// indicates no matching target with Existing object replication enabled.
		if len(tgtArns) == 0 {
			return
		}
		for _, t := range tgtArns {
			opts.TargetArn = t
			// Update replication decision for target based on existing object replciation rule.
			dsc.Set(newReplicateTargetDecision(t, c.Replicate(opts), false))
		}
		return c.resync(oi, dsc, tgtStatuses)
	}

	// Ignore previous replication status when deciding if object can be re-replicated
	objInfo := oi.Clone()
	objInfo.ReplicationStatusInternal = ""
	objInfo.VersionPurgeStatusInternal = ""
	objInfo.ReplicationStatus = ""
	objInfo.VersionPurgeStatus = ""
	delete(objInfo.UserDefined, xhttp.AmzBucketReplicationStatus)
	resyncdsc := mustReplicate(ctx, oi.Bucket, oi.Name, getMustReplicateOptions(objInfo, replication.ExistingObjectReplicationType, ObjectOptions{}))
	dsc = &resyncdsc
	return c.resync(oi, dsc, tgtStatuses)
}

// wrapper function for testability. Returns true if a new reset is requested on
// already replicated objects OR object qualifies for existing object replication
// and no reset requested.
func (c replicationConfig) resync(oi ObjectInfo, dsc *ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) {
	r = ResyncDecision{
		targets: make(map[string]ResyncTargetDecision),
	}
	if c.remotes == nil {
		return
	}
	for _, tgt := range c.remotes.Targets {
		d, ok := dsc.targetsMap[tgt.Arn]
		if !ok {
			continue
		}
		if !d.Replicate {
			continue
		}
		r.targets[d.Arn] = resyncTarget(oi, tgt.Arn, tgt.ResetID, tgt.ResetBeforeDate, tgtStatuses[tgt.Arn])
	}
	return
}

func targetResetHeader(arn string) string {
	return fmt.Sprintf("%s-%s", ReservedMetadataPrefixLower+ReplicationReset, arn)
}

func resyncTarget(oi ObjectInfo, arn string, resetID string, resetBeforeDate time.Time, tgtStatus replication.StatusType) (rd ResyncTargetDecision) {
	rd = ResyncTargetDecision{
		ResetID:         resetID,
		ResetBeforeDate: resetBeforeDate,
	}
	rs, ok := oi.UserDefined[targetResetHeader(arn)]
	if !ok {
		rs, ok = oi.UserDefined[xhttp.MinIOReplicationResetStatus] // for backward compatibility
	}
	if !ok { // existing object replication is enabled and object version is unreplicated so far.
		if resetID != "" && oi.ModTime.Before(resetBeforeDate) { // trigger replication if `mc replicate reset` requested
			rd.Replicate = true
			return
		}
		// For existing object reset - this condition is needed
		rd.Replicate = tgtStatus == ""
		return
	}
	if resetID == "" || resetBeforeDate.Equal(timeSentinel) { // no reset in progress
		return
	}

	// if already replicated, return true if a new reset was requested.
	splits := strings.SplitN(rs, ";", 2)
	if len(splits) != 2 {
		return
	}
	newReset := splits[1] != resetID
	if !newReset && tgtStatus == replication.Completed {
		// already replicated and no reset requested
		return
	}
	rd.Replicate = newReset && oi.ModTime.Before(resetBeforeDate)
	return
}

const resyncTimeInterval = time.Minute * 1

// PersistToDisk persists in-memory resync metadata stats to disk at periodic intervals
func (s *replicationResyncer) PersistToDisk(ctx context.Context, objectAPI ObjectLayer) {
	resyncTimer := time.NewTimer(resyncTimeInterval)
	defer resyncTimer.Stop()

	// For each bucket name, store the last timestamp of the
	// successful save of replication status in the backend disks.
	lastResyncStatusSave := make(map[string]time.Time)

	for {
		select {
		case <-resyncTimer.C:
			s.RLock()
			for bucket, brs := range s.statusMap {
				var updt bool
				// Save the replication status if one resync to any bucket target is still not finished
				for _, st := range brs.TargetsMap {
					if st.LastUpdate.Equal(timeSentinel) {
						updt = true
						break
					}
				}
				// Save the replication status if a new stats update is found and not saved in the backend yet
				if brs.LastUpdate.After(lastResyncStatusSave[bucket]) {
					updt = true
				}
				if updt {
					if err := saveResyncStatus(ctx, bucket, brs, objectAPI); err != nil {
						logger.LogIf(ctx, fmt.Errorf("Could not save resync metadata to drive for %s - %w", bucket, err))
					} else {
						lastResyncStatusSave[bucket] = brs.LastUpdate
					}
				}
			}
			s.RUnlock()

			resyncTimer.Reset(resyncTimeInterval)
		case <-ctx.Done():
			// server could be restarting - need
			// to exit immediately
			return
		}
	}
}

const resyncWorkerCnt = 50 // limit of number of bucket resyncs is progress at any given time

func newresyncer() *replicationResyncer {
	rs := replicationResyncer{
		statusMap:      make(map[string]BucketReplicationResyncStatus),
		workerSize:     resyncWorkerCnt,
		resyncCancelCh: make(chan struct{}, resyncWorkerCnt),
		workerCh:       make(chan struct{}, resyncWorkerCnt),
	}
	for i := 0; i < rs.workerSize; i++ {
		rs.workerCh <- struct{}{}
	}
	return &rs
}

// resyncBucket resyncs all qualifying objects as per replication rules for the target
// ARN
func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI ObjectLayer, heal bool, opts resyncOpts) {
	select {
	case <-s.workerCh: // block till a worker is available
	case <-ctx.Done():
		return
	}

	resyncStatus := ResyncFailed
	defer func() {
		s.Lock()
		m := s.statusMap[opts.bucket]
		st := m.TargetsMap[opts.arn]
		st.LastUpdate = UTCNow()
		st.ResyncStatus = resyncStatus
		m.TargetsMap[opts.arn] = st
		m.LastUpdate = UTCNow()
		s.statusMap[opts.bucket] = m
		s.Unlock()
		globalSiteResyncMetrics.incBucket(opts, resyncStatus)
		s.workerCh <- struct{}{}
	}()
	// Allocate new results channel to receive ObjectInfo.
	objInfoCh := make(chan ObjectInfo)
	cfg, err := getReplicationConfig(ctx, opts.bucket)
	if err != nil {
		logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed with %w", opts.bucket, opts.arn, err))
		return
	}
	tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, opts.bucket)
	if err != nil {
		logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed  %w", opts.bucket, opts.arn, err))
		return
	}
	rcfg := replicationConfig{
		Config:  cfg,
		remotes: tgts,
	}
	tgtArns := cfg.FilterTargetArns(
		replication.ObjectOpts{
			OpType:    replication.ResyncReplicationType,
			TargetArn: opts.arn,
		})
	if len(tgtArns) != 1 {
		logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - arn specified %s is missing in the replication config", opts.bucket, opts.arn))
		return
	}
	tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, opts.arn)
	if tgt == nil {
		logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - target could not be created for arn %s", opts.bucket, opts.arn))
		return
	}
	// mark resync status as resync started
	if !heal {
		s.Lock()
		m := s.statusMap[opts.bucket]
		st := m.TargetsMap[opts.arn]
		st.ResyncStatus = ResyncStarted
		m.TargetsMap[opts.arn] = st
		m.LastUpdate = UTCNow()
		s.statusMap[opts.bucket] = m
		s.Unlock()
	}
	// Walk through all object versions - Walk() is always in ascending order needed to ensure
	// delete marker replicated to target after object version is first created.
	if err := objectAPI.Walk(ctx, opts.bucket, "", objInfoCh, ObjectOptions{}); err != nil {
		logger.LogIf(ctx, err)
		return
	}

	s.RLock()
	m := s.statusMap[opts.bucket]
	st := m.TargetsMap[opts.arn]
	s.RUnlock()
	var lastCheckpoint string
	if st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncFailed {
		lastCheckpoint = st.Object
	}
	for obj := range objInfoCh {
		select {
		case <-s.resyncCancelCh:
			resyncStatus = ResyncCanceled
			return
		default:
		}
		if heal && lastCheckpoint != "" && lastCheckpoint != obj.Name {
			continue
		}
		lastCheckpoint = ""

		roi := getHealReplicateObjectInfo(obj, rcfg)
		if !roi.ExistingObjResync.mustResync() {
			continue
		}
		traceFn := s.trace(tgt.ResetID, fmt.Sprintf("%s/%s (%s)", opts.bucket, roi.Name, roi.VersionID))
		if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
			versionID := ""
			dmVersionID := ""
			if roi.VersionPurgeStatus.Empty() {
				dmVersionID = roi.VersionID
			} else {
				versionID = roi.VersionID
			}

			doi := DeletedObjectReplicationInfo{
				DeletedObject: DeletedObject{
					ObjectName:            roi.Name,
					DeleteMarkerVersionID: dmVersionID,
					VersionID:             versionID,
					ReplicationState:      roi.getReplicationState(roi.Dsc.String(), versionID, true),
					DeleteMarkerMTime:     DeleteMarkerMTime{roi.ModTime},
					DeleteMarker:          roi.DeleteMarker,
				},
				Bucket:    roi.Bucket,
				OpType:    replication.ExistingObjectReplicationType,
				EventType: ReplicateExistingDelete,
			}
			replicateDelete(ctx, doi, objectAPI)
		} else {
			roi.OpType = replication.ExistingObjectReplicationType
			roi.EventType = ReplicateExisting
			replicateObject(ctx, roi, objectAPI)
		}
		_, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, minio.StatObjectOptions{
			VersionID: roi.VersionID,
			Internal: minio.AdvancedGetOptions{
				ReplicationProxyRequest: "false",
			},
		})
		s.Lock()
		m = s.statusMap[opts.bucket]
		st = m.TargetsMap[opts.arn]
		st.Object = roi.Name
		success := true
		if err != nil {
			if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) {
				st.ReplicatedCount++
			} else {
				st.FailedCount++
				success = false
			}
		} else {
			st.ReplicatedCount++
			st.ReplicatedSize += roi.Size
		}
		m.TargetsMap[opts.arn] = st
		m.LastUpdate = UTCNow()
		s.statusMap[opts.bucket] = m
		s.Unlock()
		traceFn(err)
		globalSiteResyncMetrics.updateMetric(roi, success, opts.resyncID)
	}
	resyncStatus = ResyncCompleted
}

// start replication resync for the remote target ARN specified
func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opts resyncOpts) error {
	if opts.bucket == "" {
		return fmt.Errorf("bucket name is empty")
	}
	if opts.arn == "" {
		return fmt.Errorf("target ARN specified for resync is empty")
	}
	// Check if the current bucket has quota restrictions, if not skip it
	cfg, err := getReplicationConfig(ctx, opts.bucket)
	if err != nil {
		return err
	}
	tgtArns := cfg.FilterTargetArns(
		replication.ObjectOpts{
			OpType:    replication.ResyncReplicationType,
			TargetArn: opts.arn,
		})

	if len(tgtArns) == 0 {
		return fmt.Errorf("arn %s specified for resync not found in replication config", opts.arn)
	}
	globalReplicationPool.resyncer.RLock()
	data, ok := globalReplicationPool.resyncer.statusMap[opts.bucket]
	globalReplicationPool.resyncer.RUnlock()
	if !ok {
		data, err = loadBucketResyncMetadata(ctx, opts.bucket, objAPI)
		if err != nil {
			return err
		}
	}
	// validate if resync is in progress for this arn
	for tArn, st := range data.TargetsMap {
		if opts.arn == tArn && (st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncPending) {
			return fmt.Errorf("Resync of bucket %s is already in progress for remote bucket %s", opts.bucket, opts.arn)
		}
	}

	status := TargetReplicationResyncStatus{
		ResyncID:         opts.resyncID,
		ResyncBeforeDate: opts.resyncBefore,
		StartTime:        UTCNow(),
		ResyncStatus:     ResyncPending,
		Bucket:           opts.bucket,
	}
	data.TargetsMap[opts.arn] = status
	if err = saveResyncStatus(ctx, opts.bucket, data, objAPI); err != nil {
		return err
	}

	globalReplicationPool.resyncer.Lock()
	defer globalReplicationPool.resyncer.Unlock()
	brs, ok := globalReplicationPool.resyncer.statusMap[opts.bucket]
	if !ok {
		brs = BucketReplicationResyncStatus{
			Version:    resyncMetaVersion,
			TargetsMap: make(map[string]TargetReplicationResyncStatus),
		}
	}
	brs.TargetsMap[opts.arn] = status
	globalReplicationPool.resyncer.statusMap[opts.bucket] = brs
	go globalReplicationPool.resyncer.resyncBucket(GlobalContext, objAPI, false, opts)
	return nil
}

func (s *replicationResyncer) trace(resyncID string, path string) func(err error) {
	startTime := time.Now()
	return func(err error) {
		duration := time.Since(startTime)
		if globalTrace.NumSubscribers(madmin.TraceReplicationResync) > 0 {
			globalTrace.Publish(replicationResyncTrace(resyncID, startTime, duration, path, err))
		}
	}
}

func replicationResyncTrace(resyncID string, startTime time.Time, duration time.Duration, path string, err error) madmin.TraceInfo {
	var errStr string
	if err != nil {
		errStr = err.Error()
	}
	funcName := fmt.Sprintf("replication.(resyncID=%s)", resyncID)
	return madmin.TraceInfo{
		TraceType: madmin.TraceReplicationResync,
		Time:      startTime,
		NodeName:  globalLocalNodeName,
		FuncName:  funcName,
		Duration:  duration,
		Path:      path,
		Error:     errStr,
	}
}

// delete resync metadata from replication resync state in memory
func (p *ReplicationPool) deleteResyncMetadata(ctx context.Context, bucket string) {
	if p == nil {
		return
	}
	p.resyncer.Lock()
	delete(p.resyncer.statusMap, bucket)
	defer p.resyncer.Unlock()

	globalSiteResyncMetrics.deleteBucket(bucket)
}

// initResync - initializes bucket replication resync for all buckets.
func (p *ReplicationPool) initResync(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
	if objAPI == nil {
		return errServerNotInitialized
	}
	// Load bucket metadata sys in background
	go p.startResyncRoutine(ctx, buckets, objAPI)
	return nil
}

func (p *ReplicationPool) startResyncRoutine(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	// Run the replication resync in a loop
	for {
		if err := p.loadResync(ctx, buckets, objAPI); err == nil {
			<-ctx.Done()
			return
		}
		duration := time.Duration(r.Float64() * float64(time.Minute))
		if duration < time.Second {
			// Make sure to sleep atleast a second to avoid high CPU ticks.
			duration = time.Second
		}
		time.Sleep(duration)
	}
}

// Loads bucket replication resync statuses into memory.
func (p *ReplicationPool) loadResync(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
	// Make sure only one node running resync on the cluster.
	ctx, cancel := globalLeaderLock.GetLock(ctx)
	defer cancel()

	for index := range buckets {
		meta, err := loadBucketResyncMetadata(ctx, buckets[index].Name, objAPI)
		if err != nil {
			if !errors.Is(err, errVolumeNotFound) {
				logger.LogIf(ctx, err)
			}
			continue
		}

		p.resyncer.Lock()
		p.resyncer.statusMap[buckets[index].Name] = meta
		p.resyncer.Unlock()
	}
	for index := range buckets {
		bucket := buckets[index].Name
		var tgts map[string]TargetReplicationResyncStatus
		p.resyncer.RLock()
		m, ok := p.resyncer.statusMap[bucket]
		if ok {
			tgts = m.cloneTgtStats()
		}
		p.resyncer.RUnlock()
		for arn, st := range tgts {
			switch st.ResyncStatus {
			case ResyncFailed, ResyncStarted, ResyncPending:
				go p.resyncer.resyncBucket(ctx, objAPI, true, resyncOpts{
					bucket:       bucket,
					arn:          arn,
					resyncID:     st.ResyncID,
					resyncBefore: st.ResyncBeforeDate,
				})
			}
		}
	}
	return nil
}

// load bucket resync metadata from disk
func loadBucketResyncMetadata(ctx context.Context, bucket string, objAPI ObjectLayer) (brs BucketReplicationResyncStatus, e error) {
	brs = newBucketResyncStatus(bucket)
	resyncDirPath := path.Join(bucketMetaPrefix, bucket, replicationDir)
	data, err := readConfig(GlobalContext, objAPI, pathJoin(resyncDirPath, resyncFileName))
	if err != nil && err != errConfigNotFound {
		return brs, err
	}
	if len(data) == 0 {
		// Seems to be empty.
		return brs, nil
	}
	if len(data) <= 4 {
		return brs, fmt.Errorf("replication resync: no data")
	}
	// Read resync meta header
	switch binary.LittleEndian.Uint16(data[0:2]) {
	case resyncMetaFormat:
	default:
		return brs, fmt.Errorf("resyncMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
	}
	switch binary.LittleEndian.Uint16(data[2:4]) {
	case resyncMetaVersion:
	default:
		return brs, fmt.Errorf("resyncMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
	}
	// OK, parse data.
	if _, err = brs.UnmarshalMsg(data[4:]); err != nil {
		return brs, err
	}

	switch brs.Version {
	case resyncMetaVersionV1:
	default:
		return brs, fmt.Errorf("unexpected resync meta version: %d", brs.Version)
	}
	return brs, nil
}

// save resync status to resync.bin
func saveResyncStatus(ctx context.Context, bucket string, brs BucketReplicationResyncStatus, objectAPI ObjectLayer) error {
	data := make([]byte, 4, brs.Msgsize()+4)

	// Initialize the resync meta header.
	binary.LittleEndian.PutUint16(data[0:2], resyncMetaFormat)
	binary.LittleEndian.PutUint16(data[2:4], resyncMetaVersion)

	buf, err := brs.MarshalMsg(data)
	if err != nil {
		return err
	}

	configFile := path.Join(bucketMetaPrefix, bucket, replicationDir, resyncFileName)
	return saveConfig(ctx, objectAPI, configFile, buf)
}

// getReplicationDiff returns un-replicated objects in a channel.
// If a non-nil channel is returned it must be consumed fully or
// the provided context must be canceled.
func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, opts madmin.ReplDiffOpts) (chan madmin.DiffInfo, error) {
	cfg, err := getReplicationConfig(ctx, bucket)
	if err != nil {
		logger.LogIf(ctx, err)
		return nil, err
	}
	tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
	if err != nil {
		logger.LogIf(ctx, err)
		return nil, err
	}

	objInfoCh := make(chan ObjectInfo, 10)
	if err := objAPI.Walk(ctx, bucket, opts.Prefix, objInfoCh, ObjectOptions{}); err != nil {
		logger.LogIf(ctx, err)
		return nil, err
	}
	rcfg := replicationConfig{
		Config:  cfg,
		remotes: tgts,
	}
	diffCh := make(chan madmin.DiffInfo, 4000)
	go func() {
		defer close(diffCh)
		for obj := range objInfoCh {
			if contextCanceled(ctx) {
				// Just consume input...
				continue
			}
			// Ignore object prefixes which are excluded
			// from versioning via the MinIO bucket versioning extension.
			if globalBucketVersioningSys.PrefixSuspended(bucket, obj.Name) {
				continue
			}
			roi := getHealReplicateObjectInfo(obj, rcfg)
			switch roi.ReplicationStatus {
			case replication.Completed, replication.Replica:
				if !opts.Verbose {
					continue
				}
				fallthrough
			default:
				// ignore pre-existing objects that don't satisfy replication rule(s)
				if roi.ReplicationStatus.Empty() && !roi.ExistingObjResync.mustResync() {
					continue
				}
				tgtsMap := make(map[string]madmin.TgtDiffInfo)
				for arn, st := range roi.TargetStatuses {
					if opts.ARN == "" || opts.ARN == arn {
						if !opts.Verbose && (st == replication.Completed || st == replication.Replica) {
							continue
						}
						tgtsMap[arn] = madmin.TgtDiffInfo{
							ReplicationStatus: st.String(),
						}
					}
				}
				for arn, st := range roi.TargetPurgeStatuses {
					if opts.ARN == "" || opts.ARN == arn {
						if !opts.Verbose && st == Complete {
							continue
						}
						t, ok := tgtsMap[arn]
						if !ok {
							t = madmin.TgtDiffInfo{}
						}
						t.DeleteReplicationStatus = string(st)
						tgtsMap[arn] = t
					}
				}
				select {
				case diffCh <- madmin.DiffInfo{
					Object:                  obj.Name,
					VersionID:               obj.VersionID,
					LastModified:            obj.ModTime,
					IsDeleteMarker:          obj.DeleteMarker,
					ReplicationStatus:       string(roi.ReplicationStatus),
					DeleteReplicationStatus: string(roi.VersionPurgeStatus),
					ReplicationTimestamp:    roi.ReplicationTimestamp,
					Targets:                 tgtsMap,
				}:
				case <-ctx.Done():
					continue
				}
			}
		}
	}()
	return diffCh, nil
}

// QueueReplicationHeal is a wrapper for queueReplicationHeal
func QueueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo) {
	// un-versioned or a prefix
	if oi.VersionID == "" || oi.ModTime.IsZero() {
		return
	}
	rcfg, _ := getReplicationConfig(ctx, bucket)
	tgts, _ := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
	queueReplicationHeal(ctx, bucket, oi, replicationConfig{
		Config:  rcfg,
		remotes: tgts,
	})
}

// queueReplicationHeal enqueues objects that failed replication OR eligible for resyncing through
// an ongoing resync operation or via existing objects replication configuration setting.
func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcfg replicationConfig) (roi ReplicateObjectInfo) {
	// un-versioned or a prefix
	if oi.VersionID == "" || oi.ModTime.IsZero() {
		return roi
	}

	if rcfg.Config == nil || rcfg.remotes == nil {
		return roi
	}
	roi = getHealReplicateObjectInfo(oi, rcfg)
	if !roi.Dsc.ReplicateAny() {
		return
	}
	// early return if replication already done, otherwise we need to determine if this
	// version is an existing object that needs healing.
	if oi.ReplicationStatus == replication.Completed && oi.VersionPurgeStatus.Empty() && !roi.ExistingObjResync.mustResync() {
		return
	}

	if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
		versionID := ""
		dmVersionID := ""
		if roi.VersionPurgeStatus.Empty() {
			dmVersionID = roi.VersionID
		} else {
			versionID = roi.VersionID
		}

		dv := DeletedObjectReplicationInfo{
			DeletedObject: DeletedObject{
				ObjectName:            roi.Name,
				DeleteMarkerVersionID: dmVersionID,
				VersionID:             versionID,
				ReplicationState:      roi.getReplicationState(roi.Dsc.String(), versionID, true),
				DeleteMarkerMTime:     DeleteMarkerMTime{roi.ModTime},
				DeleteMarker:          roi.DeleteMarker,
			},
			Bucket:    roi.Bucket,
			OpType:    replication.HealReplicationType,
			EventType: ReplicateHealDelete,
		}
		// heal delete marker replication failure or versioned delete replication failure
		if roi.ReplicationStatus == replication.Pending ||
			roi.ReplicationStatus == replication.Failed ||
			roi.VersionPurgeStatus == Failed || roi.VersionPurgeStatus == Pending {
			globalReplicationPool.queueReplicaDeleteTask(dv)
			return
		}
		// if replication status is Complete on DeleteMarker and existing object resync required
		if roi.ExistingObjResync.mustResync() && (roi.ReplicationStatus == replication.Completed || roi.ReplicationStatus.Empty()) {
			queueReplicateDeletesWrapper(dv, roi.ExistingObjResync)
			return
		}
		return
	}
	if roi.ExistingObjResync.mustResync() {
		roi.OpType = replication.ExistingObjectReplicationType
	}
	switch roi.ReplicationStatus {
	case replication.Pending, replication.Failed:
		roi.EventType = ReplicateHeal
		globalReplicationPool.queueReplicaTask(roi)
		return
	}
	if roi.ExistingObjResync.mustResync() {
		roi.EventType = ReplicateExisting
		globalReplicationPool.queueReplicaTask(roi)
	}
	return
}

const mrfTimeInterval = 5 * time.Minute

func (p *ReplicationPool) persistMRF() {
	if !p.initialized() {
		return
	}

	var mu sync.Mutex
	entries := make(map[string]MRFReplicateEntry)
	mTimer := time.NewTimer(mrfTimeInterval)
	defer mTimer.Stop()
	saveMRFToDisk := func(drain bool) {
		mu.Lock()
		defer mu.Unlock()
		if len(entries) == 0 {
			return
		}
		cctx := p.ctx
		if drain {
			cctx = context.Background()
			// drain all mrf entries and save to disk
			for e := range p.mrfSaveCh {
				entries[e.versionID] = e
			}
		}
		if err := p.saveMRFEntries(cctx, entries); err != nil {
			logger.LogOnceIf(p.ctx, fmt.Errorf("Unable to persist replication failures to disk:%w", err), string(replicationSubsystem))
		}
		entries = make(map[string]MRFReplicateEntry)
	}
	for {
		select {
		case <-mTimer.C:
			saveMRFToDisk(false)
			mTimer.Reset(mrfTimeInterval)
		case <-p.ctx.Done():
			p.mrfStopCh <- struct{}{}
			close(p.mrfSaveCh)
			saveMRFToDisk(true)
			return
		case <-p.saveStateCh:
			saveMRFToDisk(true)
			return
		case e, ok := <-p.mrfSaveCh:
			if !ok {
				return
			}
			var cnt int
			mu.Lock()
			entries[e.versionID] = e
			cnt = len(entries)
			mu.Unlock()
			if cnt >= cap(p.mrfSaveCh) || len(p.mrfSaveCh) >= int(0.8*float32(cap(p.mrfSaveCh))) {
				saveMRFToDisk(true)
			}
		}
	}
}

func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) {
	if !p.initialized() {
		return
	}
	select {
	case <-GlobalContext.Done():
		return
	case <-p.mrfStopCh:
		return
	default:
		select {
		case p.mrfSaveCh <- entry:
		default:
		}
	}
}

// save mrf entries to mrf_<uuid>.bin
func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string]MRFReplicateEntry) error {
	if !p.initialized() {
		return nil
	}

	if len(entries) == 0 {
		return nil
	}
	v := MRFReplicateEntries{
		Entries: entries,
		Version: mrfMetaVersionV1,
	}
	data := make([]byte, 4, v.Msgsize()+4)

	// Initialize the resync meta header.
	binary.LittleEndian.PutUint16(data[0:2], resyncMetaFormat)
	binary.LittleEndian.PutUint16(data[2:4], resyncMetaVersion)

	buf, err := v.MarshalMsg(data)
	if err != nil {
		return err
	}

	configFile := path.Join(replicationMRFDir, mustGetUUID()+".bin")
	err = saveConfig(ctx, p.objLayer, configFile, buf)
	return err
}

// load mrf entries from disk
func (p *ReplicationPool) loadMRF(fileName string) (re MRFReplicateEntries, e error) {
	if !p.initialized() {
		return re, nil
	}

	data, err := readConfig(p.ctx, p.objLayer, fileName)
	if err != nil && err != errConfigNotFound {
		return re, err
	}
	if len(data) == 0 {
		// Seems to be empty.
		return re, nil
	}
	if len(data) <= 4 {
		return re, fmt.Errorf("replication mrf: no data")
	}
	// Read resync meta header
	switch binary.LittleEndian.Uint16(data[0:2]) {
	case mrfMetaFormat:
	default:
		return re, fmt.Errorf("replication mrf: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
	}
	switch binary.LittleEndian.Uint16(data[2:4]) {
	case mrfMetaVersion:
	default:
		return re, fmt.Errorf("replication mrf: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
	}
	// OK, parse data.
	if _, err = re.UnmarshalMsg(data[4:]); err != nil {
		return re, err
	}

	switch re.Version {
	case mrfMetaVersionV1:
	default:
		return re, fmt.Errorf("unexpected mrf meta version: %d", re.Version)
	}
	return re, nil
}

func (p *ReplicationPool) processMRF() {
	if !p.initialized() {
		return
	}
	pTimer := time.NewTimer(mrfTimeInterval)
	defer pTimer.Stop()
	for {
		select {
		case <-pTimer.C:
			// skip healing if all targets are offline
			var offlineCnt int
			tgts := globalBucketTargetSys.ListTargets(p.ctx, "", "")
			for _, tgt := range tgts {
				if globalBucketTargetSys.isOffline(tgt.URL()) {
					offlineCnt++
				}
			}
			if len(tgts) == offlineCnt {
				pTimer.Reset(mrfTimeInterval)
				continue
			}
			objCh := make(chan ObjectInfo)
			cctx, cancelFn := context.WithCancel(p.ctx)
			if err := p.objLayer.Walk(cctx, minioMetaBucket, replicationMRFDir, objCh, ObjectOptions{}); err != nil {
				pTimer.Reset(mrfTimeInterval)
				cancelFn()
				logger.LogIf(p.ctx, err)
				continue
			}
			for item := range objCh {
				if err := p.queueMRFHeal(item.Name); err == nil {
					p.objLayer.DeleteObject(p.ctx, minioMetaBucket, item.Name, ObjectOptions{})
				}
			}
			pTimer.Reset(mrfTimeInterval)
			cancelFn()
		case <-p.ctx.Done():
			return
		}
	}
}

// process sends error logs to the heal channel for an attempt to heal replication.
func (p *ReplicationPool) queueMRFHeal(file string) error {
	if !p.initialized() {
		return errServerNotInitialized
	}

	mrfRec, err := p.loadMRF(file)
	if err != nil {
		return err
	}
	for vID, e := range mrfRec.Entries {
		oi, err := p.objLayer.GetObjectInfo(p.ctx, e.Bucket, e.Object, ObjectOptions{
			VersionID: vID,
		})
		if err != nil {
			continue
		}
		QueueReplicationHeal(p.ctx, e.Bucket, oi)
	}
	return nil
}

// load replication stats from disk
func (p *ReplicationPool) loadStatsFromDisk() (rs map[string]BucketReplicationStats, e error) {
	if !p.initialized() {
		return map[string]BucketReplicationStats{}, nil
	}

	data, err := readConfig(p.ctx, p.objLayer, getReplicationStatsPath())
	if err != nil {
		if !errors.Is(err, errConfigNotFound) {
			return rs, nil
		}
		return rs, err
	}

	if len(data) <= 4 {
		logger.LogIf(p.ctx, fmt.Errorf("replication stats: no data"))
		return map[string]BucketReplicationStats{}, nil
	}
	// Read repl stats meta header
	switch binary.LittleEndian.Uint16(data[0:2]) {
	case replStatsMetaFormat:
	default:
		return rs, fmt.Errorf("replication stats: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
	}
	switch binary.LittleEndian.Uint16(data[2:4]) {
	case replStatsVersion:
	default:
		return rs, fmt.Errorf("replication stats: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
	}
	ss := BucketStatsMap{}
	if _, err = ss.UnmarshalMsg(data[4:]); err != nil {
		return rs, err
	}
	rs = make(map[string]BucketReplicationStats, len(ss.Stats))
	for bucket, st := range ss.Stats {
		rs[bucket] = st.ReplicationStats
	}

	return rs, nil
}

func (p *ReplicationPool) initialized() bool {
	return !(p == nil || p.objLayer == nil)
}

func (p *ReplicationPool) saveStatsToDisk() {
	if !p.initialized() {
		return
	}
	ctx, cancel := globalLeaderLock.GetLock(p.ctx)
	defer cancel()
	sTimer := time.NewTimer(replStatsSaveInterval)
	defer sTimer.Stop()
	for {
		select {
		case <-sTimer.C:
			dui, err := loadDataUsageFromBackend(GlobalContext, newObjectLayerFn())
			if err == nil && !dui.LastUpdate.IsZero() {
				globalReplicationStats.getAllLatest(dui.BucketsUsage)
			}
			p.saveStats(p.ctx)
			sTimer.Reset(replStatsSaveInterval)
		case <-ctx.Done():
			return
		}
	}
}

// save replication stats to .minio.sys/buckets/replication/node-name.stats
func (p *ReplicationPool) saveStats(ctx context.Context) error {
	if !p.initialized() {
		return nil
	}

	data, err := globalReplicationStats.serializeStats()
	if data == nil {
		return err
	}
	return saveConfig(ctx, p.objLayer, getReplicationStatsPath(), data)
}