minio/cmd/bucket-replication.go

3105 lines
96 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"path"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/dustin/go-humanize"
"github.com/minio/madmin-go/v2"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/amztime"
"github.com/minio/minio/internal/bucket/bandwidth"
objectlock "github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/zeebo/xxh3"
)
const (
throttleDeadline = 1 * time.Hour
// ReplicationReset has reset id and timestamp of last reset operation
ReplicationReset = "replication-reset"
// ReplicationStatus has internal replication status - stringified representation of target's replication status for all replication
// activity initiated from this cluster
ReplicationStatus = "replication-status"
// ReplicationTimestamp - the last time replication was initiated on this cluster for this object version
ReplicationTimestamp = "replication-timestamp"
// ReplicaStatus - this header is present if a replica was received by this cluster for this object version
ReplicaStatus = "replica-status"
// ReplicaTimestamp - the last time a replica was received by this cluster for this object version
ReplicaTimestamp = "replica-timestamp"
// TaggingTimestamp - the last time a tag metadata modification happened on this cluster for this object version
TaggingTimestamp = "tagging-timestamp"
// ObjectLockRetentionTimestamp - the last time a object lock metadata modification happened on this cluster for this object version
ObjectLockRetentionTimestamp = "objectlock-retention-timestamp"
// ObjectLockLegalHoldTimestamp - the last time a legal hold metadata modification happened on this cluster for this object version
ObjectLockLegalHoldTimestamp = "objectlock-legalhold-timestamp"
// ReplicationWorkerMultiplier is suggested worker multiplier if traffic exceeds replication worker capacity
ReplicationWorkerMultiplier = 1.5
)
func isReplicationEnabled(ctx context.Context, bucketName string) bool {
rc, _ := getReplicationConfig(ctx, bucketName)
return rc != nil
}
// gets replication config associated to a given bucket name.
func getReplicationConfig(ctx context.Context, bucketName string) (rc *replication.Config, err error) {
rCfg, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucketName)
if err != nil {
if errors.Is(err, BucketReplicationConfigNotFound{Bucket: bucketName}) || errors.Is(err, errInvalidArgument) {
return rCfg, err
}
logger.CriticalIf(ctx, err)
}
return rCfg, err
}
// validateReplicationDestination returns error if replication destination bucket missing or not configured
// It also returns true if replication destination is same as this server.
func validateReplicationDestination(ctx context.Context, bucket string, rCfg *replication.Config, checkRemote bool) (bool, APIError) {
var arns []string
if rCfg.RoleArn != "" {
arns = append(arns, rCfg.RoleArn)
} else {
for _, rule := range rCfg.Rules {
arns = append(arns, rule.Destination.String())
}
}
var sameTarget bool
for _, arnStr := range arns {
arn, err := madmin.ParseARN(arnStr)
if err != nil {
return sameTarget, errorCodes.ToAPIErrWithErr(ErrBucketRemoteArnInvalid, err)
}
if arn.Type != madmin.ReplicationService {
return sameTarget, toAPIError(ctx, BucketRemoteArnTypeInvalid{Bucket: bucket})
}
clnt := globalBucketTargetSys.GetRemoteTargetClient(ctx, arnStr)
if clnt == nil {
return sameTarget, toAPIError(ctx, BucketRemoteTargetNotFound{Bucket: bucket})
}
if checkRemote { // validate remote bucket
found, err := clnt.BucketExists(ctx, arn.Bucket)
if err != nil {
return sameTarget, errorCodes.ToAPIErrWithErr(ErrRemoteDestinationNotFoundError, err)
}
if !found {
return sameTarget, errorCodes.ToAPIErrWithErr(ErrRemoteDestinationNotFoundError, BucketRemoteTargetNotFound{Bucket: arn.Bucket})
}
if ret, err := globalBucketObjectLockSys.Get(bucket); err == nil {
if ret.LockEnabled {
lock, _, _, _, err := clnt.GetObjectLockConfig(ctx, arn.Bucket)
if err != nil {
return sameTarget, errorCodes.ToAPIErrWithErr(ErrReplicationDestinationMissingLock, err)
}
if lock != objectlock.Enabled {
return sameTarget, errorCodes.ToAPIErrWithErr(ErrReplicationDestinationMissingLock, nil)
}
}
}
}
// validate replication ARN against target endpoint
c := globalBucketTargetSys.GetRemoteTargetClient(ctx, arnStr)
if c != nil {
if c.EndpointURL().String() == clnt.EndpointURL().String() {
selfTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort)
if !sameTarget {
sameTarget = selfTarget
}
continue
}
}
}
if len(arns) == 0 {
return false, toAPIError(ctx, BucketRemoteTargetNotFound{Bucket: bucket})
}
return sameTarget, toAPIError(ctx, nil)
}
type mustReplicateOptions struct {
meta map[string]string
status replication.StatusType
opType replication.Type
replicationRequest bool // incoming request is a replication request
}
func (o mustReplicateOptions) ReplicationStatus() (s replication.StatusType) {
if rs, ok := o.meta[xhttp.AmzBucketReplicationStatus]; ok {
return replication.StatusType(rs)
}
return s
}
func (o mustReplicateOptions) isExistingObjectReplication() bool {
return o.opType == replication.ExistingObjectReplicationType
}
func (o mustReplicateOptions) isMetadataReplication() bool {
return o.opType == replication.MetadataReplicationType
}
func getMustReplicateOptions(o ObjectInfo, op replication.Type, opts ObjectOptions) mustReplicateOptions {
if !op.Valid() {
op = replication.ObjectReplicationType
if o.metadataOnly {
op = replication.MetadataReplicationType
}
}
meta := cloneMSS(o.UserDefined)
if o.UserTags != "" {
meta[xhttp.AmzObjectTagging] = o.UserTags
}
return mustReplicateOptions{
meta: meta,
status: o.ReplicationStatus,
opType: op,
replicationRequest: opts.ReplicationRequest,
}
}
// mustReplicate returns 2 booleans - true if object meets replication criteria and true if replication is to be done in
// a synchronous manner.
func mustReplicate(ctx context.Context, bucket, object string, mopts mustReplicateOptions) (dsc ReplicateDecision) {
// object layer not initialized we return with no decision.
if newObjectLayerFn() == nil {
return
}
// Disable server-side replication on object prefixes which are excluded
// from versioning via the MinIO bucket versioning extension.
if !globalBucketVersioningSys.PrefixEnabled(bucket, object) {
return
}
replStatus := mopts.ReplicationStatus()
if replStatus == replication.Replica && !mopts.isMetadataReplication() {
return
}
if mopts.replicationRequest { // incoming replication request on target cluster
return
}
cfg, err := getReplicationConfig(ctx, bucket)
if err != nil {
return
}
opts := replication.ObjectOpts{
Name: object,
SSEC: crypto.SSEC.IsEncrypted(mopts.meta),
Replica: replStatus == replication.Replica,
ExistingObject: mopts.isExistingObjectReplication(),
}
tagStr, ok := mopts.meta[xhttp.AmzObjectTagging]
if ok {
opts.UserTags = tagStr
}
tgtArns := cfg.FilterTargetArns(opts)
for _, tgtArn := range tgtArns {
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn)
// the target online status should not be used here while deciding
// whether to replicate as the target could be temporarily down
opts.TargetArn = tgtArn
replicate := cfg.Replicate(opts)
var synchronous bool
if tgt != nil {
synchronous = tgt.replicateSync
}
dsc.Set(newReplicateTargetDecision(tgtArn, replicate, synchronous))
}
return dsc
}
// Standard headers that needs to be extracted from User metadata.
var standardHeaders = []string{
xhttp.ContentType,
xhttp.CacheControl,
xhttp.ContentEncoding,
xhttp.ContentLanguage,
xhttp.ContentDisposition,
xhttp.AmzStorageClass,
xhttp.AmzObjectTagging,
xhttp.AmzBucketReplicationStatus,
xhttp.AmzObjectLockMode,
xhttp.AmzObjectLockRetainUntilDate,
xhttp.AmzObjectLockLegalHold,
xhttp.AmzTagCount,
xhttp.AmzServerSideEncryption,
}
// returns true if any of the objects being deleted qualifies for replication.
func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToDelete) bool {
c, err := getReplicationConfig(ctx, bucket)
if err != nil || c == nil {
return false
}
for _, obj := range objects {
if c.HasActiveRules(obj.ObjectName, true) {
return true
}
}
return false
}
// isStandardHeader returns true if header is a supported header and not a custom header
func isStandardHeader(matchHeaderKey string) bool {
return equals(matchHeaderKey, standardHeaders...)
}
// returns whether object version is a deletemarker and if object qualifies for replication
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, delOpts ObjectOptions, gerr error) (dsc ReplicateDecision) {
rcfg, err := getReplicationConfig(ctx, bucket)
if err != nil || rcfg == nil {
return
}
// If incoming request is a replication request, it does not need to be re-replicated.
if delOpts.ReplicationRequest {
return
}
// Skip replication if this object's prefix is excluded from being
// versioned.
if !delOpts.Versioned {
return
}
opts := replication.ObjectOpts{
Name: dobj.ObjectName,
SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined),
UserTags: oi.UserTags,
DeleteMarker: oi.DeleteMarker,
VersionID: dobj.VersionID,
OpType: replication.DeleteReplicationType,
}
tgtArns := rcfg.FilterTargetArns(opts)
if len(tgtArns) > 0 {
dsc.targetsMap = make(map[string]replicateTargetDecision, len(tgtArns))
var sync, replicate bool
for _, tgtArn := range tgtArns {
opts.TargetArn = tgtArn
replicate = rcfg.Replicate(opts)
// when incoming delete is removal of a delete marker(a.k.a versioned delete),
// GetObjectInfo returns extra information even though it returns errFileNotFound
if gerr != nil {
validReplStatus := false
switch oi.TargetReplicationStatus(tgtArn) {
case replication.Pending, replication.Completed, replication.Failed:
validReplStatus = true
}
if oi.DeleteMarker && (validReplStatus || replicate) {
dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync))
continue
} else {
// can be the case that other cluster is down and duplicate `mc rm --vid`
// is issued - this still needs to be replicated back to the other target
replicate = oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed
dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync))
continue
}
}
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn)
// the target online status should not be used here while deciding
// whether to replicate deletes as the target could be temporarily down
tgtDsc := newReplicateTargetDecision(tgtArn, false, false)
if tgt != nil {
tgtDsc = newReplicateTargetDecision(tgtArn, replicate, tgt.replicateSync)
}
dsc.Set(tgtDsc)
}
}
return dsc
}
// replicate deletes to the designated replication target if replication configuration
// has delete marker replication or delete replication (MinIO extension to allow deletes where version id
// is specified) enabled.
// Similar to bucket replication for PUT operation, soft delete (a.k.a setting delete marker) and
// permanent deletes (by specifying a version ID in the delete operation) have three states "Pending", "Complete"
// and "Failed" to mark the status of the replication of "DELETE" operation. All failed operations can
// then be retried by healing. In the case of permanent deletes, until the replication is completed on the
// target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently
// deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds
// on target.
func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer) {
var replicationStatus replication.StatusType
bucket := dobj.Bucket
versionID := dobj.DeleteMarkerVersionID
if versionID == "" {
versionID = dobj.VersionID
}
defer func() {
replStatus := string(replicationStatus)
auditLogInternal(context.Background(), AuditLogOptions{
Event: dobj.EventType,
APIName: ReplicateDeleteAPI,
Bucket: bucket,
Object: dobj.ObjectName,
VersionID: versionID,
Status: replStatus,
})
}()
rcfg, err := getReplicationConfig(ctx, bucket)
if err != nil || rcfg == nil {
logger.LogIf(ctx, err)
sendEvent(eventArgs{
BucketName: bucket,
Object: ObjectInfo{
Bucket: bucket,
Name: dobj.ObjectName,
VersionID: versionID,
DeleteMarker: dobj.DeleteMarker,
},
Host: "Internal: [Replication]",
EventName: event.ObjectReplicationNotTracked,
})
return
}
dsc, err := parseReplicateDecision(dobj.ReplicationState.ReplicateDecisionStr)
if err != nil {
logger.LogIf(ctx, err)
sendEvent(eventArgs{
BucketName: bucket,
Object: ObjectInfo{
Bucket: bucket,
Name: dobj.ObjectName,
VersionID: versionID,
DeleteMarker: dobj.DeleteMarker,
},
Host: "Internal: [Replication]",
EventName: event.ObjectReplicationNotTracked,
})
return
}
// Lock the object name before starting replication operation.
// Use separate lock that doesn't collide with regular objects.
lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+dobj.ObjectName)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
globalReplicationPool.queueMRFSave(dobj.ToMRFEntry())
logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", dobj.ObjectName, bucket, rcfg.RoleArn))
sendEvent(eventArgs{
BucketName: bucket,
Object: ObjectInfo{
Bucket: bucket,
Name: dobj.ObjectName,
VersionID: versionID,
DeleteMarker: dobj.DeleteMarker,
},
Host: "Internal: [Replication]",
EventName: event.ObjectReplicationNotTracked,
})
return
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx)
var wg sync.WaitGroup
var rinfos replicatedInfos
rinfos.Targets = make([]replicatedTargetInfo, len(dsc.targetsMap))
idx := -1
for tgtArn := range dsc.targetsMap {
idx++
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn)
if tgt == nil {
logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn))
sendEvent(eventArgs{
BucketName: bucket,
Object: ObjectInfo{
Bucket: bucket,
Name: dobj.ObjectName,
VersionID: versionID,
DeleteMarker: dobj.DeleteMarker,
},
Host: "Internal: [Replication]",
EventName: event.ObjectReplicationNotTracked,
})
continue
}
if tgt := dsc.targetsMap[tgtArn]; !tgt.Replicate {
continue
}
// if dobj.TargetArn is not empty string, this is a case of specific target being re-synced.
if dobj.TargetArn != "" && dobj.TargetArn != tgt.ARN {
continue
}
wg.Add(1)
go func(index int, tgt *TargetClient) {
defer wg.Done()
rinfo := replicateDeleteToTarget(ctx, dobj, tgt)
rinfos.Targets[index] = rinfo
}(idx, tgt)
}
wg.Wait()
replicationStatus = rinfos.ReplicationStatus()
prevStatus := dobj.DeleteMarkerReplicationStatus()
if dobj.VersionID != "" {
prevStatus = replication.StatusType(dobj.VersionPurgeStatus())
replicationStatus = replication.StatusType(rinfos.VersionPurgeStatus())
}
// to decrement pending count later.
for _, rinfo := range rinfos.Targets {
if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus {
globalReplicationStats.Update(dobj.Bucket, rinfo.Arn, 0, 0, replicationStatus,
prevStatus, replication.DeleteReplicationType)
}
}
eventName := event.ObjectReplicationComplete
if replicationStatus == replication.Failed {
eventName = event.ObjectReplicationFailed
globalReplicationPool.queueMRFSave(dobj.ToMRFEntry())
}
drs := getReplicationState(rinfos, dobj.ReplicationState, dobj.VersionID)
if replicationStatus != prevStatus {
drs.ReplicationTimeStamp = UTCNow()
}
dobjInfo, err := objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{
VersionID: versionID,
MTime: dobj.DeleteMarkerMTime.Time,
DeleteReplication: drs,
Versioned: globalBucketVersioningSys.PrefixEnabled(bucket, dobj.ObjectName),
// Objects matching prefixes should not leave delete markers,
// dramatically reduces namespace pollution while keeping the
// benefits of replication, make sure to apply version suspension
// only at bucket level instead.
VersionSuspended: globalBucketVersioningSys.Suspended(bucket),
})
if err != nil && !isErrVersionNotFound(err) { // VersionNotFound would be reported by pool that object version is missing on.
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %s", bucket, dobj.ObjectName, versionID, err))
sendEvent(eventArgs{
BucketName: bucket,
Object: ObjectInfo{
Bucket: bucket,
Name: dobj.ObjectName,
VersionID: versionID,
DeleteMarker: dobj.DeleteMarker,
},
Host: "Internal: [Replication]",
EventName: eventName,
})
} else {
sendEvent(eventArgs{
BucketName: bucket,
Object: dobjInfo,
Host: "Internal: [Replication]",
EventName: eventName,
})
}
}
func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationInfo, tgt *TargetClient) (rinfo replicatedTargetInfo) {
versionID := dobj.DeleteMarkerVersionID
if versionID == "" {
versionID = dobj.VersionID
}
rinfo = dobj.ReplicationState.targetState(tgt.ARN)
rinfo.OpType = dobj.OpType
defer func() {
if rinfo.ReplicationStatus == replication.Completed && tgt.ResetID != "" && dobj.OpType == replication.ExistingObjectReplicationType {
rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
}
}()
if dobj.VersionID == "" && rinfo.PrevReplicationStatus == replication.Completed && dobj.OpType != replication.ExistingObjectReplicationType {
rinfo.ReplicationStatus = rinfo.PrevReplicationStatus
return
}
if dobj.VersionID != "" && rinfo.VersionPurgeStatus == Complete {
return
}
if globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", dobj.Bucket, tgt.ARN))
sendEvent(eventArgs{
BucketName: dobj.Bucket,
Object: ObjectInfo{
Bucket: dobj.Bucket,
Name: dobj.ObjectName,
VersionID: dobj.VersionID,
DeleteMarker: dobj.DeleteMarker,
},
Host: "Internal: [Replication]",
EventName: event.ObjectReplicationNotTracked,
})
if dobj.VersionID == "" {
rinfo.ReplicationStatus = replication.Failed
} else {
rinfo.VersionPurgeStatus = Failed
}
return
}
// early return if already replicated delete marker for existing object replication/ healing delete markers
if dobj.DeleteMarkerVersionID != "" {
toi, err := tgt.StatObject(ctx, tgt.Bucket, dobj.ObjectName, minio.StatObjectOptions{
VersionID: versionID,
Internal: minio.AdvancedGetOptions{
ReplicationProxyRequest: "false",
IsReplicationReadyForDeleteMarker: true,
},
})
if isErrMethodNotAllowed(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)) {
if dobj.VersionID == "" {
rinfo.ReplicationStatus = replication.Completed
return
}
}
// mark delete marker replication as failed if target cluster not ready to receive
// this request yet (object version not replicated yet)
if err != nil && !toi.ReplicationReady {
rinfo.ReplicationStatus = replication.Failed
return
}
}
rmErr := tgt.RemoveObject(ctx, tgt.Bucket, dobj.ObjectName, minio.RemoveObjectOptions{
VersionID: versionID,
Internal: minio.AdvancedRemoveOptions{
ReplicationDeleteMarker: dobj.DeleteMarkerVersionID != "",
ReplicationMTime: dobj.DeleteMarkerMTime.Time,
ReplicationStatus: minio.ReplicationStatusReplica,
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
},
})
if rmErr != nil {
if dobj.VersionID == "" {
rinfo.ReplicationStatus = replication.Failed
} else {
rinfo.VersionPurgeStatus = Failed
}
logger.LogIf(ctx, fmt.Errorf("Unable to replicate delete marker to %s/%s(%s): %s", tgt.Bucket, dobj.ObjectName, versionID, rmErr))
} else {
if dobj.VersionID == "" {
rinfo.ReplicationStatus = replication.Completed
} else {
rinfo.VersionPurgeStatus = Complete
}
}
return
}
func getCopyObjMetadata(oi ObjectInfo, sc string) map[string]string {
meta := make(map[string]string, len(oi.UserDefined))
for k, v := range oi.UserDefined {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
continue
}
if equals(k, xhttp.AmzBucketReplicationStatus) {
continue
}
// https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w
if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) {
continue
}
meta[k] = v
}
if oi.ContentEncoding != "" {
meta[xhttp.ContentEncoding] = oi.ContentEncoding
}
if oi.ContentType != "" {
meta[xhttp.ContentType] = oi.ContentType
}
if oi.UserTags != "" {
meta[xhttp.AmzObjectTagging] = oi.UserTags
meta[xhttp.AmzTagDirective] = "REPLACE"
}
if sc == "" {
sc = oi.StorageClass
}
// drop non standard storage classes for tiering from replication
if sc != "" && (sc == storageclass.RRS || sc == storageclass.STANDARD) {
meta[xhttp.AmzStorageClass] = sc
}
meta[xhttp.MinIOSourceETag] = oi.ETag
meta[xhttp.MinIOSourceMTime] = oi.ModTime.UTC().Format(time.RFC3339Nano)
meta[xhttp.AmzBucketReplicationStatus] = replication.Replica.String()
return meta
}
type caseInsensitiveMap map[string]string
// Lookup map entry case insensitively.
func (m caseInsensitiveMap) Lookup(key string) (string, bool) {
if len(m) == 0 {
return "", false
}
for _, k := range []string{
key,
strings.ToLower(key),
http.CanonicalHeaderKey(key),
} {
v, ok := m[k]
if ok {
return v, ok
}
}
return "", false
}
func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts minio.PutObjectOptions, err error) {
meta := make(map[string]string)
for k, v := range objInfo.UserDefined {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
continue
}
if isStandardHeader(k) {
continue
}
meta[k] = v
}
if sc == "" && (objInfo.StorageClass == storageclass.STANDARD || objInfo.StorageClass == storageclass.RRS) {
sc = objInfo.StorageClass
}
putOpts = minio.PutObjectOptions{
UserMetadata: meta,
ContentType: objInfo.ContentType,
ContentEncoding: objInfo.ContentEncoding,
StorageClass: sc,
Internal: minio.AdvancedPutOptions{
SourceVersionID: objInfo.VersionID,
ReplicationStatus: minio.ReplicationStatusReplica,
SourceMTime: objInfo.ModTime,
SourceETag: objInfo.ETag,
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
},
}
if objInfo.UserTags != "" {
tag, _ := tags.ParseObjectTags(objInfo.UserTags)
if tag != nil {
putOpts.UserTags = tag.ToMap()
// set tag timestamp in opts
tagTimestamp := objInfo.ModTime
if tagTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp]; ok {
tagTimestamp, err = time.Parse(time.RFC3339Nano, tagTmstampStr)
if err != nil {
return putOpts, err
}
}
putOpts.Internal.TaggingTimestamp = tagTimestamp
}
}
lkMap := caseInsensitiveMap(objInfo.UserDefined)
if lang, ok := lkMap.Lookup(xhttp.ContentLanguage); ok {
putOpts.ContentLanguage = lang
}
if disp, ok := lkMap.Lookup(xhttp.ContentDisposition); ok {
putOpts.ContentDisposition = disp
}
if cc, ok := lkMap.Lookup(xhttp.CacheControl); ok {
putOpts.CacheControl = cc
}
if mode, ok := lkMap.Lookup(xhttp.AmzObjectLockMode); ok {
rmode := minio.RetentionMode(mode)
putOpts.Mode = rmode
}
if retainDateStr, ok := lkMap.Lookup(xhttp.AmzObjectLockRetainUntilDate); ok {
rdate, err := amztime.ISO8601Parse(retainDateStr)
if err != nil {
return putOpts, err
}
putOpts.RetainUntilDate = rdate
// set retention timestamp in opts
retTimestamp := objInfo.ModTime
if retainTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp]; ok {
retTimestamp, err = time.Parse(time.RFC3339Nano, retainTmstampStr)
if err != nil {
return putOpts, err
}
}
putOpts.Internal.RetentionTimestamp = retTimestamp
}
if lhold, ok := lkMap.Lookup(xhttp.AmzObjectLockLegalHold); ok {
putOpts.LegalHold = minio.LegalHoldStatus(lhold)
// set legalhold timestamp in opts
lholdTimestamp := objInfo.ModTime
if lholdTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp]; ok {
lholdTimestamp, err = time.Parse(time.RFC3339Nano, lholdTmstampStr)
if err != nil {
return putOpts, err
}
}
putOpts.Internal.LegalholdTimestamp = lholdTimestamp
}
if crypto.S3.IsEncrypted(objInfo.UserDefined) {
putOpts.ServerSideEncryption = encrypt.NewSSE()
}
return
}
type replicationAction string
const (
replicateMetadata replicationAction = "metadata"
replicateNone replicationAction = "none"
replicateAll replicationAction = "all"
)
// matches k1 with all keys, returns 'true' if one of them matches
func equals(k1 string, keys ...string) bool {
for _, k2 := range keys {
if strings.EqualFold(k1, k2) {
return true
}
}
return false
}
// returns replicationAction by comparing metadata between source and target
func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo, opType replication.Type) replicationAction {
// Avoid resyncing null versions created prior to enabling replication if target has a newer copy
if opType == replication.ExistingObjectReplicationType &&
oi1.ModTime.Unix() > oi2.LastModified.Unix() && oi1.VersionID == nullVersionID {
return replicateNone
}
// needs full replication
if oi1.ETag != oi2.ETag ||
oi1.VersionID != oi2.VersionID ||
oi1.Size != oi2.Size ||
oi1.DeleteMarker != oi2.IsDeleteMarker ||
oi1.ModTime.Unix() != oi2.LastModified.Unix() {
return replicateAll
}
if oi1.ContentType != oi2.ContentType {
return replicateMetadata
}
if oi1.ContentEncoding != "" {
enc, ok := oi2.Metadata[xhttp.ContentEncoding]
if !ok {
enc, ok = oi2.Metadata[strings.ToLower(xhttp.ContentEncoding)]
if !ok {
return replicateMetadata
}
}
if strings.Join(enc, ",") != oi1.ContentEncoding {
return replicateMetadata
}
}
t, _ := tags.ParseObjectTags(oi1.UserTags)
if !reflect.DeepEqual(oi2.UserTags, t.ToMap()) || (oi2.UserTagCount != len(t.ToMap())) {
return replicateMetadata
}
// Compare only necessary headers
compareKeys := []string{
"Expires",
"Cache-Control",
"Content-Language",
"Content-Disposition",
"X-Amz-Object-Lock-Mode",
"X-Amz-Object-Lock-Retain-Until-Date",
"X-Amz-Object-Lock-Legal-Hold",
"X-Amz-Website-Redirect-Location",
"X-Amz-Meta-",
}
// compare metadata on both maps to see if meta is identical
compareMeta1 := make(map[string]string)
for k, v := range oi1.UserDefined {
var found bool
for _, prefix := range compareKeys {
if !strings.HasPrefix(strings.ToLower(k), strings.ToLower(prefix)) {
continue
}
found = true
break
}
if found {
compareMeta1[strings.ToLower(k)] = v
}
}
compareMeta2 := make(map[string]string)
for k, v := range oi2.Metadata {
var found bool
for _, prefix := range compareKeys {
if !strings.HasPrefix(strings.ToLower(k), strings.ToLower(prefix)) {
continue
}
found = true
break
}
if found {
compareMeta2[strings.ToLower(k)] = strings.Join(v, ",")
}
}
if !reflect.DeepEqual(compareMeta1, compareMeta2) {
return replicateMetadata
}
return replicateNone
}
// replicateObject replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer) {
var replicationStatus replication.StatusType
defer func() {
if replicationStatus.Empty() {
// replication status is empty means
// replication was not attempted for some
// reason, notify the state of the object
// on disk.
replicationStatus = ri.ReplicationStatus
}
auditLogInternal(ctx, AuditLogOptions{
Event: ri.EventType,
APIName: ReplicateObjectAPI,
Bucket: ri.Bucket,
Object: ri.Name,
VersionID: ri.VersionID,
Status: replicationStatus.String(),
})
}()
objInfo := ri.ObjectInfo
bucket := objInfo.Bucket
object := objInfo.Name
cfg, err := getReplicationConfig(ctx, bucket)
if err != nil {
logger.LogIf(ctx, err)
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
return
}
tgtArns := cfg.FilterTargetArns(replication.ObjectOpts{
Name: object,
SSEC: crypto.SSEC.IsEncrypted(objInfo.UserDefined),
UserTags: objInfo.UserTags,
})
// Lock the object name before starting replication.
// Use separate lock that doesn't collide with regular objects.
lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+object)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", object, bucket, cfg.RoleArn))
return
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx)
var wg sync.WaitGroup
var rinfos replicatedInfos
rinfos.Targets = make([]replicatedTargetInfo, len(tgtArns))
for i, tgtArn := range tgtArns {
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn)
if tgt == nil {
logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn))
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
continue
}
wg.Add(1)
go func(index int, tgt *TargetClient) {
defer wg.Done()
if ri.OpType == replication.ObjectReplicationType {
// all incoming calls go through optimized path.
rinfos.Targets[index] = ri.replicateObject(ctx, objectAPI, tgt)
} else {
rinfos.Targets[index] = ri.replicateAll(ctx, objectAPI, tgt)
}
}(i, tgt)
}
wg.Wait()
// FIXME: add support for missing replication events
// - event.ObjectReplicationMissedThreshold
// - event.ObjectReplicationReplicatedAfterThreshold
eventName := event.ObjectReplicationComplete
if rinfos.ReplicationStatus() == replication.Failed {
eventName = event.ObjectReplicationFailed
}
newReplStatusInternal := rinfos.ReplicationStatusInternal()
// Note that internal replication status(es) may match for previously replicated objects - in such cases
// metadata should be updated with last resync timestamp.
if objInfo.ReplicationStatusInternal != newReplStatusInternal || rinfos.ReplicationResynced() {
popts := ObjectOptions{
MTime: objInfo.ModTime,
VersionID: objInfo.VersionID,
EvalMetadataFn: func(oi *ObjectInfo) error {
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = newReplStatusInternal
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
oi.UserDefined[xhttp.AmzBucketReplicationStatus] = string(rinfos.ReplicationStatus())
for _, rinfo := range rinfos.Targets {
if rinfo.ResyncTimestamp != "" {
oi.UserDefined[targetResetHeader(rinfo.Arn)] = rinfo.ResyncTimestamp
}
}
if objInfo.UserTags != "" {
oi.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
}
return nil
},
}
if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w",
bucket, objInfo.Name, objInfo.VersionID, err))
}
opType := replication.MetadataReplicationType
if rinfos.Action() == replicateAll {
opType = replication.ObjectReplicationType
}
for _, rinfo := range rinfos.Targets {
if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus {
globalReplicationStats.Update(bucket, rinfo.Arn, rinfo.Size, rinfo.Duration, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus, opType)
}
}
}
sendEvent(eventArgs{
EventName: eventName,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
// re-queue failures once more - keep a retry count to avoid flooding the queue if
// the target site is down. Leave it to scanner to catch up instead.
if rinfos.ReplicationStatus() != replication.Completed {
ri.OpType = replication.HealReplicationType
ri.EventType = ReplicateMRF
ri.ReplicationStatusInternal = rinfos.ReplicationStatusInternal()
ri.RetryCount++
globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
}
}
// replicateObject replicates object data for specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
startTime := time.Now()
objInfo := ri.ObjectInfo.Clone()
bucket := objInfo.Bucket
object := objInfo.Name
sz, _ := objInfo.GetActualSize()
rAction := replicateAll
rinfo = replicatedTargetInfo{
Size: sz,
Arn: tgt.ARN,
PrevReplicationStatus: objInfo.TargetReplicationStatus(tgt.ARN),
ReplicationStatus: replication.Failed,
OpType: ri.OpType,
ReplicationAction: rAction,
}
if ri.ObjectInfo.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) {
rinfo.ReplicationStatus = replication.Completed
rinfo.ReplicationResynced = true
return
}
if globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, tgt.ARN))
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
return
}
versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
VersionID: objInfo.VersionID,
Versioned: versioned,
VersionSuspended: versionSuspended,
})
if err != nil {
if !isErrVersionNotFound(err) && !isErrObjectNotFound(err) {
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
logger.LogIf(ctx, fmt.Errorf("Unable to update replicate metadata for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err))
}
return
}
defer gr.Close()
objInfo = gr.ObjInfo
size, err := objInfo.GetActualSize()
if err != nil {
logger.LogIf(ctx, err)
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
return
}
if tgt.Bucket == "" {
logger.LogIf(ctx, fmt.Errorf("Unable to replicate object %s(%s), bucket is empty", objInfo.Name, objInfo.VersionID))
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
return rinfo
}
defer func() {
if rinfo.ReplicationStatus == replication.Completed && ri.OpType == replication.ExistingObjectReplicationType && tgt.ResetID != "" {
rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
rinfo.ReplicationResynced = true
}
rinfo.Duration = time.Since(startTime)
}()
rinfo.ReplicationStatus = replication.Completed
rinfo.Size = size
rinfo.ReplicationAction = rAction
// use core client to avoid doing multipart on PUT
c := &minio.Core{Client: tgt.Client}
putOpts, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s err:%w", bucket, err))
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
return
}
var headerSize int
for k, v := range putOpts.Header() {
headerSize += len(k) + len(v)
}
opts := &bandwidth.MonitorReaderOptions{
Bucket: objInfo.Bucket,
TargetARN: tgt.ARN,
HeaderSize: headerSize,
}
newCtx := ctx
if globalBucketMonitor.IsThrottled(bucket, tgt.ARN) {
var cancel context.CancelFunc
newCtx, cancel = context.WithTimeout(ctx, throttleDeadline)
defer cancel()
}
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
if objInfo.isMultipart() {
if err := replicateObjectWithMultipart(ctx, c, tgt.Bucket, object,
r, objInfo, putOpts); err != nil {
if minio.ToErrorResponse(err).Code != "PreConditionFailed" {
rinfo.ReplicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
}
}
} else {
if _, err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts); err != nil {
if minio.ToErrorResponse(err).Code != "PreConditionFailed" {
rinfo.ReplicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
}
}
}
return
}
// replicateAll replicates metadata for specified version of the object to destination bucket
// if the destination version is missing it automatically does fully copy as well.
// The source object is then updated to reflect the replication status.
func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
startTime := time.Now()
objInfo := ri.ObjectInfo.Clone()
bucket := objInfo.Bucket
object := objInfo.Name
sz, _ := objInfo.GetActualSize()
// set defaults for replication action based on operation being performed - actual
// replication action can only be determined after stat on remote. This default is
// needed for updating replication metrics correctly when target is offline.
rAction := replicateMetadata
rinfo = replicatedTargetInfo{
Size: sz,
Arn: tgt.ARN,
PrevReplicationStatus: objInfo.TargetReplicationStatus(tgt.ARN),
ReplicationStatus: replication.Failed,
OpType: ri.OpType,
ReplicationAction: rAction,
}
if ri.ObjectInfo.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) {
rinfo.ReplicationStatus = replication.Completed
rinfo.ReplicationResynced = true
return
}
if globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, tgt.ARN))
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
return
}
versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
VersionID: objInfo.VersionID,
Versioned: versioned,
VersionSuspended: versionSuspended,
})
if err != nil {
if !isErrVersionNotFound(err) && !isErrObjectNotFound(err) {
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
logger.LogIf(ctx, fmt.Errorf("Unable to update replicate metadata for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err))
}
return
}
defer gr.Close()
objInfo = gr.ObjInfo
size, err := objInfo.GetActualSize()
if err != nil {
logger.LogIf(ctx, err)
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
return
}
if tgt.Bucket == "" {
logger.LogIf(ctx, fmt.Errorf("Unable to replicate object %s(%s), bucket is empty", objInfo.Name, objInfo.VersionID))
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
return rinfo
}
defer func() {
if rinfo.ReplicationStatus == replication.Completed && ri.OpType == replication.ExistingObjectReplicationType && tgt.ResetID != "" {
rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
rinfo.ReplicationResynced = true
}
rinfo.Duration = time.Since(startTime)
}()
rAction = replicateAll
oi, cerr := tgt.StatObject(ctx, tgt.Bucket, object, minio.StatObjectOptions{
VersionID: objInfo.VersionID,
Internal: minio.AdvancedGetOptions{
ReplicationProxyRequest: "false",
},
})
if cerr == nil {
rAction = getReplicationAction(objInfo, oi, ri.OpType)
rinfo.ReplicationStatus = replication.Completed
if rAction == replicateNone {
if ri.OpType == replication.ExistingObjectReplicationType &&
objInfo.ModTime.Unix() > oi.LastModified.Unix() && objInfo.VersionID == nullVersionID {
logger.LogIf(ctx, fmt.Errorf("Unable to replicate %s/%s (null). Newer version exists on target", bucket, object))
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
}
// object with same VersionID already exists, replication kicked off by
// PutObject might have completed
if objInfo.TargetReplicationStatus(tgt.ARN) == replication.Pending || objInfo.TargetReplicationStatus(tgt.ARN) == replication.Failed || ri.OpType == replication.ExistingObjectReplicationType {
// if metadata is not updated for some reason after replication, such as
// 503 encountered while updating metadata - make sure to set ReplicationStatus
// as Completed.
//
// Note: Replication Stats would have been updated despite metadata update failure.
rinfo.ReplicationAction = rAction
rinfo.ReplicationStatus = replication.Completed
}
return
}
}
rinfo.ReplicationStatus = replication.Completed
rinfo.Size = size
rinfo.ReplicationAction = rAction
// use core client to avoid doing multipart on PUT
c := &minio.Core{Client: tgt.Client}
if rAction != replicateAll {
// replicate metadata for object tagging/copy with metadata replacement
srcOpts := minio.CopySrcOptions{
Bucket: tgt.Bucket,
Object: object,
VersionID: objInfo.VersionID,
}
dstOpts := minio.PutObjectOptions{
Internal: minio.AdvancedPutOptions{
SourceVersionID: objInfo.VersionID,
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
},
}
if _, err = c.CopyObject(ctx, tgt.Bucket, object, tgt.Bucket, object, getCopyObjMetadata(objInfo, tgt.StorageClass), srcOpts, dstOpts); err != nil {
rinfo.ReplicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
}
} else {
var putOpts minio.PutObjectOptions
putOpts, err = putReplicationOpts(ctx, tgt.StorageClass, objInfo)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s err:%w", bucket, err))
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
return
}
var headerSize int
for k, v := range putOpts.Header() {
headerSize += len(k) + len(v)
}
opts := &bandwidth.MonitorReaderOptions{
Bucket: objInfo.Bucket,
TargetARN: tgt.ARN,
HeaderSize: headerSize,
}
newCtx := ctx
if globalBucketMonitor.IsThrottled(bucket, tgt.ARN) {
var cancel context.CancelFunc
newCtx, cancel = context.WithTimeout(ctx, throttleDeadline)
defer cancel()
}
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
if objInfo.isMultipart() {
if err := replicateObjectWithMultipart(ctx, c, tgt.Bucket, object,
r, objInfo, putOpts); err != nil {
if minio.ToErrorResponse(err).Code != "PreConditionFailed" {
rinfo.ReplicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
} else {
rinfo.ReplicationStatus = replication.Completed
}
}
} else {
if _, err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts); err != nil {
if minio.ToErrorResponse(err).Code != "PreConditionFailed" {
rinfo.ReplicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
} else {
rinfo.ReplicationStatus = replication.Completed
}
}
}
}
return
}
func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, object string, r io.Reader, objInfo ObjectInfo, opts minio.PutObjectOptions) (err error) {
var uploadedParts []minio.CompletePart
uploadID, err := c.NewMultipartUpload(context.Background(), bucket, object, opts)
if err != nil {
return err
}
defer func() {
if err != nil {
// block and abort remote upload upon failure.
attempts := 1
for attempts <= 3 {
aerr := c.AbortMultipartUpload(ctx, bucket, object, uploadID)
if aerr == nil {
return
}
logger.LogIf(ctx,
fmt.Errorf("Trying %s: Unable to cleanup failed multipart replication %s on remote %s/%s: %w - this may consume space on remote cluster",
humanize.Ordinal(attempts), uploadID, bucket, object, aerr))
attempts++
time.Sleep(time.Second)
}
}
}()
var (
hr *hash.Reader
pInfo minio.ObjectPart
)
for _, partInfo := range objInfo.Parts {
hr, err = hash.NewReader(r, partInfo.ActualSize, "", "", partInfo.ActualSize)
if err != nil {
return err
}
pInfo, err = c.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, hr, partInfo.ActualSize, "", "", opts.ServerSideEncryption)
if err != nil {
return err
}
if pInfo.Size != partInfo.ActualSize {
return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.ActualSize)
}
uploadedParts = append(uploadedParts, minio.CompletePart{
PartNumber: pInfo.PartNumber,
ETag: pInfo.ETag,
})
}
_, err = c.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, minio.PutObjectOptions{
Internal: minio.AdvancedPutOptions{
SourceMTime: objInfo.ModTime,
// always set this to distinguish between `mc mirror` replication and serverside
ReplicationRequest: true,
},
})
return err
}
// filterReplicationStatusMetadata filters replication status metadata for COPY
func filterReplicationStatusMetadata(metadata map[string]string) map[string]string {
// Copy on write
dst := metadata
var copied bool
delKey := func(key string) {
if _, ok := metadata[key]; !ok {
return
}
if !copied {
dst = make(map[string]string, len(metadata))
for k, v := range metadata {
dst[k] = v
}
copied = true
}
delete(dst, key)
}
delKey(xhttp.AmzBucketReplicationStatus)
return dst
}
// DeletedObjectReplicationInfo has info on deleted object
type DeletedObjectReplicationInfo struct {
DeletedObject
Bucket string
EventType string
OpType replication.Type
ResetID string
TargetArn string
}
// ToMRFEntry returns the relevant info needed by MRF
func (di DeletedObjectReplicationInfo) ToMRFEntry() MRFReplicateEntry {
versionID := di.DeleteMarkerVersionID
if versionID == "" {
versionID = di.VersionID
}
return MRFReplicateEntry{
Bucket: di.Bucket,
Object: di.ObjectName,
versionID: versionID,
}
}
// Replication specific APIName
const (
ReplicateObjectAPI = "ReplicateObject"
ReplicateDeleteAPI = "ReplicateDelete"
)
const (
// ReplicateQueued - replication being queued trail
ReplicateQueued = "replicate:queue"
// ReplicateExisting - audit trail for existing objects replication
ReplicateExisting = "replicate:existing"
// ReplicateExistingDelete - audit trail for delete replication triggered for existing delete markers
ReplicateExistingDelete = "replicate:existing:delete"
// ReplicateMRF - audit trail for replication from Most Recent Failures (MRF) queue
ReplicateMRF = "replicate:mrf"
// ReplicateIncoming - audit trail of inline replication
ReplicateIncoming = "replicate:incoming"
// ReplicateIncomingDelete - audit trail of inline replication of deletes.
ReplicateIncomingDelete = "replicate:incoming:delete"
// ReplicateHeal - audit trail for healing of failed/pending replications
ReplicateHeal = "replicate:heal"
// ReplicateHealDelete - audit trail of healing of failed/pending delete replications.
ReplicateHealDelete = "replicate:heal:delete"
)
var (
globalReplicationPool *ReplicationPool
globalReplicationStats *ReplicationStats
)
// ReplicationPool describes replication pool
type ReplicationPool struct {
// atomic ops:
activeWorkers int32
activeMRFWorkers int32
objLayer ObjectLayer
ctx context.Context
priority string
mu sync.RWMutex
resyncer *replicationResyncer
// workers:
workers []chan ReplicationWorkerOperation
existingWorkers chan ReplicationWorkerOperation
workerWg sync.WaitGroup
// mrf:
mrfWorkerKillCh chan struct{}
mrfReplicaCh chan ReplicationWorkerOperation
mrfSaveCh chan MRFReplicateEntry
mrfStopCh chan struct{}
mrfWorkerSize int
saveStateCh chan struct{}
mrfWorkerWg sync.WaitGroup
}
// ReplicationWorkerOperation is a shared interface of replication operations.
type ReplicationWorkerOperation interface {
ToMRFEntry() MRFReplicateEntry
}
const (
// WorkerMaxLimit max number of workers per node for "fast" mode
WorkerMaxLimit = 500
// WorkerMinLimit min number of workers per node for "slow" mode
WorkerMinLimit = 50
// WorkerAutoDefault is default number of workers for "auto" mode
WorkerAutoDefault = 100
// MRFWorkerMaxLimit max number of mrf workers per node for "fast" mode
MRFWorkerMaxLimit = 8
// MRFWorkerMinLimit min number of mrf workers per node for "slow" mode
MRFWorkerMinLimit = 2
// MRFWorkerAutoDefault is default number of mrf workers for "auto" mode
MRFWorkerAutoDefault = 4
)
// NewReplicationPool creates a pool of replication workers of specified size
func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool {
var workers, failedWorkers int
priority := "auto"
if opts.Priority != "" {
priority = opts.Priority
}
switch priority {
case "fast":
workers = WorkerMaxLimit
failedWorkers = MRFWorkerMaxLimit
case "slow":
workers = WorkerMinLimit
failedWorkers = MRFWorkerMinLimit
default:
workers = WorkerAutoDefault
failedWorkers = MRFWorkerAutoDefault
}
pool := &ReplicationPool{
workers: make([]chan ReplicationWorkerOperation, 0, workers),
existingWorkers: make(chan ReplicationWorkerOperation, 100000),
mrfReplicaCh: make(chan ReplicationWorkerOperation, 100000),
mrfWorkerKillCh: make(chan struct{}, failedWorkers),
resyncer: newresyncer(),
mrfSaveCh: make(chan MRFReplicateEntry, 100000),
mrfStopCh: make(chan struct{}, 1),
saveStateCh: make(chan struct{}, 1),
ctx: ctx,
objLayer: o,
priority: priority,
}
pool.ResizeWorkers(workers, 0)
pool.ResizeFailedWorkers(failedWorkers)
pool.workerWg.Add(1)
go pool.AddWorker(pool.existingWorkers, nil)
go pool.resyncer.PersistToDisk(ctx, o)
go pool.processMRF()
go pool.persistMRF()
go pool.saveStatsToDisk()
return pool
}
// AddMRFWorker adds a pending/failed replication worker to handle requests that could not be queued
// to the other workers
func (p *ReplicationPool) AddMRFWorker() {
defer p.mrfWorkerWg.Done()
for {
select {
case <-p.ctx.Done():
return
case oi, ok := <-p.mrfReplicaCh:
if !ok {
return
}
switch v := oi.(type) {
case ReplicateObjectInfo:
atomic.AddInt32(&p.activeMRFWorkers, 1)
replicateObject(p.ctx, v, p.objLayer)
atomic.AddInt32(&p.activeMRFWorkers, -1)
default:
logger.LogOnceIf(p.ctx, fmt.Errorf("unknown mrf replication type: %T", oi), "unknown-mrf-replicate-type")
}
case <-p.mrfWorkerKillCh:
return
}
}
}
// AddWorker adds a replication worker to the pool.
// An optional pointer to a tracker that will be atomically
// incremented when operations are running can be provided.
func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opTracker *int32) {
defer p.workerWg.Done()
for {
select {
case <-p.ctx.Done():
return
case oi, ok := <-input:
if !ok {
return
}
switch v := oi.(type) {
case ReplicateObjectInfo:
if opTracker != nil {
atomic.AddInt32(opTracker, 1)
}
replicateObject(p.ctx, v, p.objLayer)
if opTracker != nil {
atomic.AddInt32(opTracker, -1)
}
case DeletedObjectReplicationInfo:
if opTracker != nil {
atomic.AddInt32(opTracker, 1)
}
replicateDelete(p.ctx, v, p.objLayer)
if opTracker != nil {
atomic.AddInt32(opTracker, -1)
}
default:
logger.LogOnceIf(p.ctx, fmt.Errorf("unknown replication type: %T", oi), "unknown-replicate-type")
}
}
}
}
// ActiveWorkers returns the number of active workers handling replication traffic.
func (p *ReplicationPool) ActiveWorkers() int {
return int(atomic.LoadInt32(&p.activeWorkers))
}
// ActiveMRFWorkers returns the number of active workers handling replication failures.
func (p *ReplicationPool) ActiveMRFWorkers() int {
return int(atomic.LoadInt32(&p.activeMRFWorkers))
}
// ResizeWorkers sets replication workers pool to new size.
// checkOld can be set to an expected value.
// If the worker count changed
func (p *ReplicationPool) ResizeWorkers(n, checkOld int) {
p.mu.Lock()
defer p.mu.Unlock()
if (checkOld > 0 && len(p.workers) != checkOld) || n == len(p.workers) || n < 1 {
// Either already satisfied or worker count changed while we waited for the lock.
return
}
for len(p.workers) < n {
input := make(chan ReplicationWorkerOperation, 10000)
p.workers = append(p.workers, input)
p.workerWg.Add(1)
go p.AddWorker(input, &p.activeWorkers)
}
for len(p.workers) > n {
worker := p.workers[len(p.workers)-1]
p.workers = p.workers[:len(p.workers)-1]
close(worker)
}
}
// ResizeWorkerPriority sets replication failed workers pool size
func (p *ReplicationPool) ResizeWorkerPriority(pri string) {
var workers, mrfWorkers int
p.mu.Lock()
switch pri {
case "fast":
workers = WorkerMaxLimit
mrfWorkers = MRFWorkerMaxLimit
case "slow":
workers = WorkerMinLimit
mrfWorkers = MRFWorkerMinLimit
default:
workers = WorkerAutoDefault
mrfWorkers = MRFWorkerAutoDefault
if len(p.workers) < WorkerAutoDefault {
workers = int(math.Min(float64(len(p.workers)+1), WorkerAutoDefault))
}
if p.mrfWorkerSize < MRFWorkerAutoDefault {
mrfWorkers = int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerAutoDefault))
}
}
p.priority = pri
p.mu.Unlock()
p.ResizeWorkers(workers, 0)
p.ResizeFailedWorkers(mrfWorkers)
}
// ResizeFailedWorkers sets replication failed workers pool size
func (p *ReplicationPool) ResizeFailedWorkers(n int) {
p.mu.Lock()
defer p.mu.Unlock()
for p.mrfWorkerSize < n {
p.mrfWorkerSize++
p.mrfWorkerWg.Add(1)
go p.AddMRFWorker()
}
for p.mrfWorkerSize > n {
p.mrfWorkerSize--
go func() { p.mrfWorkerKillCh <- struct{}{} }()
}
}
// getWorkerCh gets a worker channel deterministically based on bucket and object names.
// Must be able to grab read lock from p.
func (p *ReplicationPool) getWorkerCh(bucket, object string) chan<- ReplicationWorkerOperation {
h := xxh3.HashString(bucket + object)
p.mu.RLock()
defer p.mu.RUnlock()
if len(p.workers) == 0 {
return nil
}
return p.workers[h%uint64(len(p.workers))]
}
func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
if p == nil {
return
}
var ch, healCh chan<- ReplicationWorkerOperation
switch ri.OpType {
case replication.ExistingObjectReplicationType:
ch = p.existingWorkers
case replication.HealReplicationType:
ch = p.mrfReplicaCh
healCh = p.getWorkerCh(ri.Name, ri.Bucket)
default:
ch = p.getWorkerCh(ri.Name, ri.Bucket)
}
if ch == nil && healCh == nil {
return
}
select {
case <-p.ctx.Done():
case healCh <- ri:
case ch <- ri:
default:
globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
p.mu.RLock()
prio := p.priority
p.mu.RUnlock()
switch prio {
case "fast":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic"), string(replicationSubsystem))
case "slow":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem))
default:
if p.ActiveWorkers() < WorkerMaxLimit {
p.mu.RLock()
workers := int(math.Min(float64(len(p.workers)+1), WorkerMaxLimit))
existing := len(p.workers)
p.mu.RUnlock()
p.ResizeWorkers(workers, existing)
}
if p.ActiveMRFWorkers() < MRFWorkerMaxLimit {
p.mu.RLock()
workers := int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerMaxLimit))
p.mu.RUnlock()
p.ResizeFailedWorkers(workers)
}
}
}
}
func queueReplicateDeletesWrapper(doi DeletedObjectReplicationInfo, existingObjectResync ResyncDecision) {
for k, v := range existingObjectResync.targets {
if v.Replicate {
doi.ResetID = v.ResetID
doi.TargetArn = k
globalReplicationPool.queueReplicaDeleteTask(doi)
}
}
}
func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInfo) {
if p == nil {
return
}
var ch chan<- ReplicationWorkerOperation
switch doi.OpType {
case replication.ExistingObjectReplicationType:
ch = p.existingWorkers
case replication.HealReplicationType:
fallthrough
default:
ch = p.getWorkerCh(doi.Bucket, doi.ObjectName)
}
select {
case <-p.ctx.Done():
case ch <- doi:
default:
globalReplicationPool.queueMRFSave(doi.ToMRFEntry())
p.mu.RLock()
prio := p.priority
p.mu.RUnlock()
switch prio {
case "fast":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes"), string(replicationSubsystem))
case "slow":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem))
default:
if p.ActiveWorkers() < WorkerMaxLimit {
p.mu.RLock()
workers := int(math.Min(float64(len(p.workers)+1), WorkerMaxLimit))
existing := len(p.workers)
p.mu.RUnlock()
p.ResizeWorkers(workers, existing)
}
}
}
}
type replicationPoolOpts struct {
Priority string
}
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
globalReplicationPool = NewReplicationPool(ctx, objectAPI, replicationPoolOpts{
Priority: globalAPIConfig.getReplicationPriority(),
})
globalReplicationStats = NewReplicationStats(ctx, objectAPI)
go globalReplicationStats.loadInitialReplicationMetrics(ctx)
}
type proxyResult struct {
Proxy bool
Err error
}
// get Reader from replication target if active-active replication is in place and
// this node returns a 404
func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, _ http.Header, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (gr *GetObjectReader, proxy proxyResult, err error) {
tgt, oi, proxy := proxyHeadToRepTarget(ctx, bucket, object, rs, opts, proxyTargets)
if !proxy.Proxy {
return nil, proxy, nil
}
fn, _, _, err := NewGetObjectReader(nil, oi, opts)
if err != nil {
return nil, proxy, err
}
gopts := minio.GetObjectOptions{
VersionID: opts.VersionID,
ServerSideEncryption: opts.ServerSideEncryption,
Internal: minio.AdvancedGetOptions{
ReplicationProxyRequest: "true",
},
PartNumber: opts.PartNumber,
}
// get correct offsets for encrypted object
if rs != nil {
h, err := rs.ToHeader()
if err != nil {
return nil, proxy, err
}
gopts.Set(xhttp.Range, h)
}
// Make sure to match ETag when proxying.
if err = gopts.SetMatchETag(oi.ETag); err != nil {
return nil, proxy, err
}
c := minio.Core{Client: tgt.Client}
obj, _, h, err := c.GetObject(ctx, bucket, object, gopts)
if err != nil {
return nil, proxy, err
}
closeReader := func() { obj.Close() }
reader, err := fn(obj, h, closeReader)
if err != nil {
return nil, proxy, err
}
reader.ObjInfo = oi.Clone()
if rs != nil {
contentSize, err := parseSizeFromContentRange(h)
if err != nil {
return nil, proxy, err
}
reader.ObjInfo.Size = contentSize
}
return reader, proxyResult{Proxy: true}, nil
}
func getProxyTargets(ctx context.Context, bucket, object string, opts ObjectOptions) (tgts *madmin.BucketTargets) {
if opts.VersionSuspended {
return &madmin.BucketTargets{}
}
if !opts.ProxyRequest {
return &madmin.BucketTargets{}
}
cfg, err := getReplicationConfig(ctx, bucket)
if err != nil || cfg == nil {
return &madmin.BucketTargets{}
}
topts := replication.ObjectOpts{Name: object}
tgtArns := cfg.FilterTargetArns(topts)
tgts = &madmin.BucketTargets{Targets: make([]madmin.BucketTarget, len(tgtArns))}
for i, tgtArn := range tgtArns {
tgt := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, bucket, tgtArn)
tgts.Targets[i] = tgt
}
return tgts
}
func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (tgt *TargetClient, oi ObjectInfo, proxy proxyResult) {
// this option is set when active-active replication is in place between site A -> B,
// and site B does not have the object yet.
if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header
return nil, oi, proxy
}
for _, t := range proxyTargets.Targets {
tgt = globalBucketTargetSys.GetRemoteTargetClient(ctx, t.Arn)
if tgt == nil || globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
continue
}
// if proxying explicitly disabled on remote target
if tgt.disableProxy {
continue
}
gopts := minio.GetObjectOptions{
VersionID: opts.VersionID,
ServerSideEncryption: opts.ServerSideEncryption,
Internal: minio.AdvancedGetOptions{
ReplicationProxyRequest: "true",
},
PartNumber: opts.PartNumber,
}
if rs != nil {
h, err := rs.ToHeader()
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Invalid range header for %s/%s(%s) - %w", bucket, object, opts.VersionID, err))
continue
}
gopts.Set(xhttp.Range, h)
}
objInfo, err := tgt.StatObject(ctx, t.TargetBucket, object, gopts)
if err != nil {
if isErrInvalidRange(ErrorRespToObjectError(err, bucket, object)) {
return nil, oi, proxyResult{Err: err}
}
continue
}
tags, _ := tags.MapToObjectTags(objInfo.UserTags)
oi = ObjectInfo{
Bucket: bucket,
Name: object,
ModTime: objInfo.LastModified,
Size: objInfo.Size,
ETag: objInfo.ETag,
VersionID: objInfo.VersionID,
IsLatest: objInfo.IsLatest,
DeleteMarker: objInfo.IsDeleteMarker,
ContentType: objInfo.ContentType,
Expires: objInfo.Expires,
StorageClass: objInfo.StorageClass,
ReplicationStatusInternal: objInfo.ReplicationStatus,
UserTags: tags.String(),
}
oi.UserDefined = make(map[string]string, len(objInfo.Metadata))
for k, v := range objInfo.Metadata {
oi.UserDefined[k] = v[0]
}
ce, ok := oi.UserDefined[xhttp.ContentEncoding]
if !ok {
ce, ok = oi.UserDefined[strings.ToLower(xhttp.ContentEncoding)]
}
if ok {
oi.ContentEncoding = ce
}
return tgt, oi, proxyResult{Proxy: true}
}
return nil, oi, proxy
}
// get object info from replication target if active-active replication is in place and
// this node returns a 404
func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (oi ObjectInfo, proxy proxyResult) {
_, oi, proxy = proxyHeadToRepTarget(ctx, bucket, object, rs, opts, proxyTargets)
return oi, proxy
}
func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, dsc ReplicateDecision, opType replication.Type) {
ri := ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType, Dsc: dsc, EventType: ReplicateIncoming}
if dsc.Synchronous() {
replicateObject(ctx, ri, o)
} else {
globalReplicationPool.queueReplicaTask(ri)
}
if sz, err := objInfo.GetActualSize(); err == nil {
for arn := range dsc.targetsMap {
globalReplicationStats.Update(objInfo.Bucket, arn, sz, 0, objInfo.ReplicationStatus, replication.StatusType(""), opType)
}
}
}
func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer) {
globalReplicationPool.queueReplicaDeleteTask(dv)
for arn := range dv.ReplicationState.Targets {
globalReplicationStats.Update(dv.Bucket, arn, 0, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
}
for arn := range dv.ReplicationState.PurgeTargets {
globalReplicationStats.Update(dv.Bucket, arn, 0, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
}
}
type replicationConfig struct {
Config *replication.Config
remotes *madmin.BucketTargets
}
func (c replicationConfig) Empty() bool {
return c.Config == nil
}
func (c replicationConfig) Replicate(opts replication.ObjectOpts) bool {
return c.Config.Replicate(opts)
}
// Resync returns true if replication reset is requested
func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc *ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) {
if c.Empty() {
return
}
// Now overlay existing object replication choices for target
if oi.DeleteMarker {
opts := replication.ObjectOpts{
Name: oi.Name,
SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined),
UserTags: oi.UserTags,
DeleteMarker: oi.DeleteMarker,
VersionID: oi.VersionID,
OpType: replication.DeleteReplicationType,
ExistingObject: true,
}
tgtArns := c.Config.FilterTargetArns(opts)
// indicates no matching target with Existing object replication enabled.
if len(tgtArns) == 0 {
return
}
for _, t := range tgtArns {
opts.TargetArn = t
// Update replication decision for target based on existing object replciation rule.
dsc.Set(newReplicateTargetDecision(t, c.Replicate(opts), false))
}
return c.resync(oi, dsc, tgtStatuses)
}
// Ignore previous replication status when deciding if object can be re-replicated
objInfo := oi.Clone()
objInfo.ReplicationStatusInternal = ""
objInfo.VersionPurgeStatusInternal = ""
objInfo.ReplicationStatus = ""
objInfo.VersionPurgeStatus = ""
delete(objInfo.UserDefined, xhttp.AmzBucketReplicationStatus)
resyncdsc := mustReplicate(ctx, oi.Bucket, oi.Name, getMustReplicateOptions(objInfo, replication.ExistingObjectReplicationType, ObjectOptions{}))
dsc = &resyncdsc
return c.resync(oi, dsc, tgtStatuses)
}
// wrapper function for testability. Returns true if a new reset is requested on
// already replicated objects OR object qualifies for existing object replication
// and no reset requested.
func (c replicationConfig) resync(oi ObjectInfo, dsc *ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) {
r = ResyncDecision{
targets: make(map[string]ResyncTargetDecision),
}
if c.remotes == nil {
return
}
for _, tgt := range c.remotes.Targets {
d, ok := dsc.targetsMap[tgt.Arn]
if !ok {
continue
}
if !d.Replicate {
continue
}
r.targets[d.Arn] = resyncTarget(oi, tgt.Arn, tgt.ResetID, tgt.ResetBeforeDate, tgtStatuses[tgt.Arn])
}
return
}
func targetResetHeader(arn string) string {
return fmt.Sprintf("%s-%s", ReservedMetadataPrefixLower+ReplicationReset, arn)
}
func resyncTarget(oi ObjectInfo, arn string, resetID string, resetBeforeDate time.Time, tgtStatus replication.StatusType) (rd ResyncTargetDecision) {
rd = ResyncTargetDecision{
ResetID: resetID,
ResetBeforeDate: resetBeforeDate,
}
rs, ok := oi.UserDefined[targetResetHeader(arn)]
if !ok {
rs, ok = oi.UserDefined[xhttp.MinIOReplicationResetStatus] // for backward compatibility
}
if !ok { // existing object replication is enabled and object version is unreplicated so far.
if resetID != "" && oi.ModTime.Before(resetBeforeDate) { // trigger replication if `mc replicate reset` requested
rd.Replicate = true
return
}
// For existing object reset - this condition is needed
rd.Replicate = tgtStatus == ""
return
}
if resetID == "" || resetBeforeDate.Equal(timeSentinel) { // no reset in progress
return
}
// if already replicated, return true if a new reset was requested.
splits := strings.SplitN(rs, ";", 2)
if len(splits) != 2 {
return
}
newReset := splits[1] != resetID
if !newReset && tgtStatus == replication.Completed {
// already replicated and no reset requested
return
}
rd.Replicate = newReset && oi.ModTime.Before(resetBeforeDate)
return
}
const resyncTimeInterval = time.Minute * 1
// PersistToDisk persists in-memory resync metadata stats to disk at periodic intervals
func (s *replicationResyncer) PersistToDisk(ctx context.Context, objectAPI ObjectLayer) {
resyncTimer := time.NewTimer(resyncTimeInterval)
defer resyncTimer.Stop()
// For each bucket name, store the last timestamp of the
// successful save of replication status in the backend disks.
lastResyncStatusSave := make(map[string]time.Time)
for {
select {
case <-resyncTimer.C:
s.RLock()
for bucket, brs := range s.statusMap {
var updt bool
// Save the replication status if one resync to any bucket target is still not finished
for _, st := range brs.TargetsMap {
if st.LastUpdate.Equal(timeSentinel) {
updt = true
break
}
}
// Save the replication status if a new stats update is found and not saved in the backend yet
if brs.LastUpdate.After(lastResyncStatusSave[bucket]) {
updt = true
}
if updt {
if err := saveResyncStatus(ctx, bucket, brs, objectAPI); err != nil {
logger.LogIf(ctx, fmt.Errorf("Could not save resync metadata to drive for %s - %w", bucket, err))
} else {
lastResyncStatusSave[bucket] = brs.LastUpdate
}
}
}
s.RUnlock()
resyncTimer.Reset(resyncTimeInterval)
case <-ctx.Done():
// server could be restarting - need
// to exit immediately
return
}
}
}
const resyncWorkerCnt = 50 // limit of number of bucket resyncs is progress at any given time
func newresyncer() *replicationResyncer {
rs := replicationResyncer{
statusMap: make(map[string]BucketReplicationResyncStatus),
workerSize: resyncWorkerCnt,
resyncCancelCh: make(chan struct{}, resyncWorkerCnt),
workerCh: make(chan struct{}, resyncWorkerCnt),
}
for i := 0; i < rs.workerSize; i++ {
rs.workerCh <- struct{}{}
}
return &rs
}
// resyncBucket resyncs all qualifying objects as per replication rules for the target
// ARN
func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI ObjectLayer, heal bool, opts resyncOpts) {
select {
case <-s.workerCh: // block till a worker is available
case <-ctx.Done():
return
}
resyncStatus := ResyncFailed
defer func() {
s.Lock()
m := s.statusMap[opts.bucket]
st := m.TargetsMap[opts.arn]
st.LastUpdate = UTCNow()
st.ResyncStatus = resyncStatus
m.TargetsMap[opts.arn] = st
m.LastUpdate = UTCNow()
s.statusMap[opts.bucket] = m
s.Unlock()
globalSiteResyncMetrics.incBucket(opts, resyncStatus)
s.workerCh <- struct{}{}
}()
// Allocate new results channel to receive ObjectInfo.
objInfoCh := make(chan ObjectInfo)
cfg, err := getReplicationConfig(ctx, opts.bucket)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed with %w", opts.bucket, opts.arn, err))
return
}
tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, opts.bucket)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed %w", opts.bucket, opts.arn, err))
return
}
rcfg := replicationConfig{
Config: cfg,
remotes: tgts,
}
tgtArns := cfg.FilterTargetArns(
replication.ObjectOpts{
OpType: replication.ResyncReplicationType,
TargetArn: opts.arn,
})
if len(tgtArns) != 1 {
logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - arn specified %s is missing in the replication config", opts.bucket, opts.arn))
return
}
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, opts.arn)
if tgt == nil {
logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - target could not be created for arn %s", opts.bucket, opts.arn))
return
}
// mark resync status as resync started
if !heal {
s.Lock()
m := s.statusMap[opts.bucket]
st := m.TargetsMap[opts.arn]
st.ResyncStatus = ResyncStarted
m.TargetsMap[opts.arn] = st
m.LastUpdate = UTCNow()
s.statusMap[opts.bucket] = m
s.Unlock()
}
// Walk through all object versions - Walk() is always in ascending order needed to ensure
// delete marker replicated to target after object version is first created.
if err := objectAPI.Walk(ctx, opts.bucket, "", objInfoCh, ObjectOptions{}); err != nil {
logger.LogIf(ctx, err)
return
}
s.RLock()
m := s.statusMap[opts.bucket]
st := m.TargetsMap[opts.arn]
s.RUnlock()
var lastCheckpoint string
if st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncFailed {
lastCheckpoint = st.Object
}
for obj := range objInfoCh {
select {
case <-s.resyncCancelCh:
resyncStatus = ResyncCanceled
return
default:
}
if heal && lastCheckpoint != "" && lastCheckpoint != obj.Name {
continue
}
lastCheckpoint = ""
roi := getHealReplicateObjectInfo(obj, rcfg)
if !roi.ExistingObjResync.mustResync() {
continue
}
traceFn := s.trace(tgt.ResetID, fmt.Sprintf("%s/%s (%s)", opts.bucket, roi.Name, roi.VersionID))
if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
versionID := ""
dmVersionID := ""
if roi.VersionPurgeStatus.Empty() {
dmVersionID = roi.VersionID
} else {
versionID = roi.VersionID
}
doi := DeletedObjectReplicationInfo{
DeletedObject: DeletedObject{
ObjectName: roi.Name,
DeleteMarkerVersionID: dmVersionID,
VersionID: versionID,
ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true),
DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
DeleteMarker: roi.DeleteMarker,
},
Bucket: roi.Bucket,
OpType: replication.ExistingObjectReplicationType,
EventType: ReplicateExistingDelete,
}
replicateDelete(ctx, doi, objectAPI)
} else {
roi.OpType = replication.ExistingObjectReplicationType
roi.EventType = ReplicateExisting
replicateObject(ctx, roi, objectAPI)
}
_, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, minio.StatObjectOptions{
VersionID: roi.VersionID,
Internal: minio.AdvancedGetOptions{
ReplicationProxyRequest: "false",
},
})
s.Lock()
m = s.statusMap[opts.bucket]
st = m.TargetsMap[opts.arn]
st.Object = roi.Name
success := true
if err != nil {
if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) {
st.ReplicatedCount++
} else {
st.FailedCount++
success = false
}
} else {
st.ReplicatedCount++
st.ReplicatedSize += roi.Size
}
m.TargetsMap[opts.arn] = st
m.LastUpdate = UTCNow()
s.statusMap[opts.bucket] = m
s.Unlock()
traceFn(err)
globalSiteResyncMetrics.updateMetric(roi, success, opts.resyncID)
}
resyncStatus = ResyncCompleted
}
// start replication resync for the remote target ARN specified
func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opts resyncOpts) error {
if opts.bucket == "" {
return fmt.Errorf("bucket name is empty")
}
if opts.arn == "" {
return fmt.Errorf("target ARN specified for resync is empty")
}
// Check if the current bucket has quota restrictions, if not skip it
cfg, err := getReplicationConfig(ctx, opts.bucket)
if err != nil {
return err
}
tgtArns := cfg.FilterTargetArns(
replication.ObjectOpts{
OpType: replication.ResyncReplicationType,
TargetArn: opts.arn,
})
if len(tgtArns) == 0 {
return fmt.Errorf("arn %s specified for resync not found in replication config", opts.arn)
}
globalReplicationPool.resyncer.RLock()
data, ok := globalReplicationPool.resyncer.statusMap[opts.bucket]
globalReplicationPool.resyncer.RUnlock()
if !ok {
data, err = loadBucketResyncMetadata(ctx, opts.bucket, objAPI)
if err != nil {
return err
}
}
// validate if resync is in progress for this arn
for tArn, st := range data.TargetsMap {
if opts.arn == tArn && (st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncPending) {
return fmt.Errorf("Resync of bucket %s is already in progress for remote bucket %s", opts.bucket, opts.arn)
}
}
status := TargetReplicationResyncStatus{
ResyncID: opts.resyncID,
ResyncBeforeDate: opts.resyncBefore,
StartTime: UTCNow(),
ResyncStatus: ResyncPending,
Bucket: opts.bucket,
}
data.TargetsMap[opts.arn] = status
if err = saveResyncStatus(ctx, opts.bucket, data, objAPI); err != nil {
return err
}
globalReplicationPool.resyncer.Lock()
defer globalReplicationPool.resyncer.Unlock()
brs, ok := globalReplicationPool.resyncer.statusMap[opts.bucket]
if !ok {
brs = BucketReplicationResyncStatus{
Version: resyncMetaVersion,
TargetsMap: make(map[string]TargetReplicationResyncStatus),
}
}
brs.TargetsMap[opts.arn] = status
globalReplicationPool.resyncer.statusMap[opts.bucket] = brs
go globalReplicationPool.resyncer.resyncBucket(GlobalContext, objAPI, false, opts)
return nil
}
func (s *replicationResyncer) trace(resyncID string, path string) func(err error) {
startTime := time.Now()
return func(err error) {
duration := time.Since(startTime)
if globalTrace.NumSubscribers(madmin.TraceReplicationResync) > 0 {
globalTrace.Publish(replicationResyncTrace(resyncID, startTime, duration, path, err))
}
}
}
func replicationResyncTrace(resyncID string, startTime time.Time, duration time.Duration, path string, err error) madmin.TraceInfo {
var errStr string
if err != nil {
errStr = err.Error()
}
funcName := fmt.Sprintf("replication.(resyncID=%s)", resyncID)
return madmin.TraceInfo{
TraceType: madmin.TraceReplicationResync,
Time: startTime,
NodeName: globalLocalNodeName,
FuncName: funcName,
Duration: duration,
Path: path,
Error: errStr,
}
}
// delete resync metadata from replication resync state in memory
func (p *ReplicationPool) deleteResyncMetadata(ctx context.Context, bucket string) {
if p == nil {
return
}
p.resyncer.Lock()
delete(p.resyncer.statusMap, bucket)
defer p.resyncer.Unlock()
globalSiteResyncMetrics.deleteBucket(bucket)
}
// initResync - initializes bucket replication resync for all buckets.
func (p *ReplicationPool) initResync(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
if objAPI == nil {
return errServerNotInitialized
}
// Load bucket metadata sys in background
go p.startResyncRoutine(ctx, buckets, objAPI)
return nil
}
func (p *ReplicationPool) startResyncRoutine(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
// Run the replication resync in a loop
for {
if err := p.loadResync(ctx, buckets, objAPI); err == nil {
<-ctx.Done()
return
}
duration := time.Duration(r.Float64() * float64(time.Minute))
if duration < time.Second {
// Make sure to sleep atleast a second to avoid high CPU ticks.
duration = time.Second
}
time.Sleep(duration)
}
}
// Loads bucket replication resync statuses into memory.
func (p *ReplicationPool) loadResync(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
// Make sure only one node running resync on the cluster.
ctx, cancel := globalLeaderLock.GetLock(ctx)
defer cancel()
for index := range buckets {
meta, err := loadBucketResyncMetadata(ctx, buckets[index].Name, objAPI)
if err != nil {
if !errors.Is(err, errVolumeNotFound) {
logger.LogIf(ctx, err)
}
continue
}
p.resyncer.Lock()
p.resyncer.statusMap[buckets[index].Name] = meta
p.resyncer.Unlock()
}
for index := range buckets {
bucket := buckets[index].Name
var tgts map[string]TargetReplicationResyncStatus
p.resyncer.RLock()
m, ok := p.resyncer.statusMap[bucket]
if ok {
tgts = m.cloneTgtStats()
}
p.resyncer.RUnlock()
for arn, st := range tgts {
switch st.ResyncStatus {
case ResyncFailed, ResyncStarted, ResyncPending:
go p.resyncer.resyncBucket(ctx, objAPI, true, resyncOpts{
bucket: bucket,
arn: arn,
resyncID: st.ResyncID,
resyncBefore: st.ResyncBeforeDate,
})
}
}
}
return nil
}
// load bucket resync metadata from disk
func loadBucketResyncMetadata(ctx context.Context, bucket string, objAPI ObjectLayer) (brs BucketReplicationResyncStatus, e error) {
brs = newBucketResyncStatus(bucket)
resyncDirPath := path.Join(bucketMetaPrefix, bucket, replicationDir)
data, err := readConfig(GlobalContext, objAPI, pathJoin(resyncDirPath, resyncFileName))
if err != nil && err != errConfigNotFound {
return brs, err
}
if len(data) == 0 {
// Seems to be empty.
return brs, nil
}
if len(data) <= 4 {
return brs, fmt.Errorf("replication resync: no data")
}
// Read resync meta header
switch binary.LittleEndian.Uint16(data[0:2]) {
case resyncMetaFormat:
default:
return brs, fmt.Errorf("resyncMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
}
switch binary.LittleEndian.Uint16(data[2:4]) {
case resyncMetaVersion:
default:
return brs, fmt.Errorf("resyncMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
}
// OK, parse data.
if _, err = brs.UnmarshalMsg(data[4:]); err != nil {
return brs, err
}
switch brs.Version {
case resyncMetaVersionV1:
default:
return brs, fmt.Errorf("unexpected resync meta version: %d", brs.Version)
}
return brs, nil
}
// save resync status to resync.bin
func saveResyncStatus(ctx context.Context, bucket string, brs BucketReplicationResyncStatus, objectAPI ObjectLayer) error {
data := make([]byte, 4, brs.Msgsize()+4)
// Initialize the resync meta header.
binary.LittleEndian.PutUint16(data[0:2], resyncMetaFormat)
binary.LittleEndian.PutUint16(data[2:4], resyncMetaVersion)
buf, err := brs.MarshalMsg(data)
if err != nil {
return err
}
configFile := path.Join(bucketMetaPrefix, bucket, replicationDir, resyncFileName)
return saveConfig(ctx, objectAPI, configFile, buf)
}
// getReplicationDiff returns un-replicated objects in a channel.
// If a non-nil channel is returned it must be consumed fully or
// the provided context must be canceled.
func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, opts madmin.ReplDiffOpts) (chan madmin.DiffInfo, error) {
cfg, err := getReplicationConfig(ctx, bucket)
if err != nil {
logger.LogIf(ctx, err)
return nil, err
}
tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
if err != nil {
logger.LogIf(ctx, err)
return nil, err
}
objInfoCh := make(chan ObjectInfo, 10)
if err := objAPI.Walk(ctx, bucket, opts.Prefix, objInfoCh, ObjectOptions{}); err != nil {
logger.LogIf(ctx, err)
return nil, err
}
rcfg := replicationConfig{
Config: cfg,
remotes: tgts,
}
diffCh := make(chan madmin.DiffInfo, 4000)
go func() {
defer close(diffCh)
for obj := range objInfoCh {
if contextCanceled(ctx) {
// Just consume input...
continue
}
// Ignore object prefixes which are excluded
// from versioning via the MinIO bucket versioning extension.
if globalBucketVersioningSys.PrefixSuspended(bucket, obj.Name) {
continue
}
roi := getHealReplicateObjectInfo(obj, rcfg)
switch roi.ReplicationStatus {
case replication.Completed, replication.Replica:
if !opts.Verbose {
continue
}
fallthrough
default:
// ignore pre-existing objects that don't satisfy replication rule(s)
if roi.ReplicationStatus.Empty() && !roi.ExistingObjResync.mustResync() {
continue
}
tgtsMap := make(map[string]madmin.TgtDiffInfo)
for arn, st := range roi.TargetStatuses {
if opts.ARN == "" || opts.ARN == arn {
if !opts.Verbose && (st == replication.Completed || st == replication.Replica) {
continue
}
tgtsMap[arn] = madmin.TgtDiffInfo{
ReplicationStatus: st.String(),
}
}
}
for arn, st := range roi.TargetPurgeStatuses {
if opts.ARN == "" || opts.ARN == arn {
if !opts.Verbose && st == Complete {
continue
}
t, ok := tgtsMap[arn]
if !ok {
t = madmin.TgtDiffInfo{}
}
t.DeleteReplicationStatus = string(st)
tgtsMap[arn] = t
}
}
select {
case diffCh <- madmin.DiffInfo{
Object: obj.Name,
VersionID: obj.VersionID,
LastModified: obj.ModTime,
IsDeleteMarker: obj.DeleteMarker,
ReplicationStatus: string(roi.ReplicationStatus),
DeleteReplicationStatus: string(roi.VersionPurgeStatus),
ReplicationTimestamp: roi.ReplicationTimestamp,
Targets: tgtsMap,
}:
case <-ctx.Done():
continue
}
}
}
}()
return diffCh, nil
}
// QueueReplicationHeal is a wrapper for queueReplicationHeal
func QueueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo) {
// un-versioned or a prefix
if oi.VersionID == "" || oi.ModTime.IsZero() {
return
}
rcfg, _ := getReplicationConfig(ctx, bucket)
tgts, _ := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
queueReplicationHeal(ctx, bucket, oi, replicationConfig{
Config: rcfg,
remotes: tgts,
})
}
// queueReplicationHeal enqueues objects that failed replication OR eligible for resyncing through
// an ongoing resync operation or via existing objects replication configuration setting.
func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcfg replicationConfig) (roi ReplicateObjectInfo) {
// un-versioned or a prefix
if oi.VersionID == "" || oi.ModTime.IsZero() {
return roi
}
if rcfg.Config == nil || rcfg.remotes == nil {
return roi
}
roi = getHealReplicateObjectInfo(oi, rcfg)
if !roi.Dsc.ReplicateAny() {
return
}
// early return if replication already done, otherwise we need to determine if this
// version is an existing object that needs healing.
if oi.ReplicationStatus == replication.Completed && oi.VersionPurgeStatus.Empty() && !roi.ExistingObjResync.mustResync() {
return
}
if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
versionID := ""
dmVersionID := ""
if roi.VersionPurgeStatus.Empty() {
dmVersionID = roi.VersionID
} else {
versionID = roi.VersionID
}
dv := DeletedObjectReplicationInfo{
DeletedObject: DeletedObject{
ObjectName: roi.Name,
DeleteMarkerVersionID: dmVersionID,
VersionID: versionID,
ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true),
DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
DeleteMarker: roi.DeleteMarker,
},
Bucket: roi.Bucket,
OpType: replication.HealReplicationType,
EventType: ReplicateHealDelete,
}
// heal delete marker replication failure or versioned delete replication failure
if roi.ReplicationStatus == replication.Pending ||
roi.ReplicationStatus == replication.Failed ||
roi.VersionPurgeStatus == Failed || roi.VersionPurgeStatus == Pending {
globalReplicationPool.queueReplicaDeleteTask(dv)
return
}
// if replication status is Complete on DeleteMarker and existing object resync required
if roi.ExistingObjResync.mustResync() && (roi.ReplicationStatus == replication.Completed || roi.ReplicationStatus.Empty()) {
queueReplicateDeletesWrapper(dv, roi.ExistingObjResync)
return
}
return
}
if roi.ExistingObjResync.mustResync() {
roi.OpType = replication.ExistingObjectReplicationType
}
switch roi.ReplicationStatus {
case replication.Pending, replication.Failed:
roi.EventType = ReplicateHeal
globalReplicationPool.queueReplicaTask(roi)
return
}
if roi.ExistingObjResync.mustResync() {
roi.EventType = ReplicateExisting
globalReplicationPool.queueReplicaTask(roi)
}
return
}
const mrfTimeInterval = 5 * time.Minute
func (p *ReplicationPool) persistMRF() {
if !p.initialized() {
return
}
var mu sync.Mutex
entries := make(map[string]MRFReplicateEntry)
mTimer := time.NewTimer(mrfTimeInterval)
defer mTimer.Stop()
saveMRFToDisk := func(drain bool) {
mu.Lock()
defer mu.Unlock()
if len(entries) == 0 {
return
}
cctx := p.ctx
if drain {
cctx = context.Background()
// drain all mrf entries and save to disk
for e := range p.mrfSaveCh {
entries[e.versionID] = e
}
}
if err := p.saveMRFEntries(cctx, entries); err != nil {
logger.LogOnceIf(p.ctx, fmt.Errorf("Unable to persist replication failures to disk:%w", err), string(replicationSubsystem))
}
entries = make(map[string]MRFReplicateEntry)
}
for {
select {
case <-mTimer.C:
saveMRFToDisk(false)
mTimer.Reset(mrfTimeInterval)
case <-p.ctx.Done():
p.mrfStopCh <- struct{}{}
close(p.mrfSaveCh)
saveMRFToDisk(true)
return
case <-p.saveStateCh:
saveMRFToDisk(true)
return
case e, ok := <-p.mrfSaveCh:
if !ok {
return
}
var cnt int
mu.Lock()
entries[e.versionID] = e
cnt = len(entries)
mu.Unlock()
if cnt >= cap(p.mrfSaveCh) || len(p.mrfSaveCh) >= int(0.8*float32(cap(p.mrfSaveCh))) {
saveMRFToDisk(true)
}
}
}
}
func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) {
if !p.initialized() {
return
}
select {
case <-GlobalContext.Done():
return
case <-p.mrfStopCh:
return
default:
select {
case p.mrfSaveCh <- entry:
default:
}
}
}
// save mrf entries to mrf_<uuid>.bin
func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string]MRFReplicateEntry) error {
if !p.initialized() {
return nil
}
if len(entries) == 0 {
return nil
}
v := MRFReplicateEntries{
Entries: entries,
Version: mrfMetaVersionV1,
}
data := make([]byte, 4, v.Msgsize()+4)
// Initialize the resync meta header.
binary.LittleEndian.PutUint16(data[0:2], resyncMetaFormat)
binary.LittleEndian.PutUint16(data[2:4], resyncMetaVersion)
buf, err := v.MarshalMsg(data)
if err != nil {
return err
}
configFile := path.Join(replicationMRFDir, mustGetUUID()+".bin")
err = saveConfig(ctx, p.objLayer, configFile, buf)
return err
}
// load mrf entries from disk
func (p *ReplicationPool) loadMRF(fileName string) (re MRFReplicateEntries, e error) {
if !p.initialized() {
return re, nil
}
data, err := readConfig(p.ctx, p.objLayer, fileName)
if err != nil && err != errConfigNotFound {
return re, err
}
if len(data) == 0 {
// Seems to be empty.
return re, nil
}
if len(data) <= 4 {
return re, fmt.Errorf("replication mrf: no data")
}
// Read resync meta header
switch binary.LittleEndian.Uint16(data[0:2]) {
case mrfMetaFormat:
default:
return re, fmt.Errorf("replication mrf: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
}
switch binary.LittleEndian.Uint16(data[2:4]) {
case mrfMetaVersion:
default:
return re, fmt.Errorf("replication mrf: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
}
// OK, parse data.
if _, err = re.UnmarshalMsg(data[4:]); err != nil {
return re, err
}
switch re.Version {
case mrfMetaVersionV1:
default:
return re, fmt.Errorf("unexpected mrf meta version: %d", re.Version)
}
return re, nil
}
func (p *ReplicationPool) processMRF() {
if !p.initialized() {
return
}
pTimer := time.NewTimer(mrfTimeInterval)
defer pTimer.Stop()
for {
select {
case <-pTimer.C:
// skip healing if all targets are offline
var offlineCnt int
tgts := globalBucketTargetSys.ListTargets(p.ctx, "", "")
for _, tgt := range tgts {
if globalBucketTargetSys.isOffline(tgt.URL()) {
offlineCnt++
}
}
if len(tgts) == offlineCnt {
pTimer.Reset(mrfTimeInterval)
continue
}
objCh := make(chan ObjectInfo)
cctx, cancelFn := context.WithCancel(p.ctx)
if err := p.objLayer.Walk(cctx, minioMetaBucket, replicationMRFDir, objCh, ObjectOptions{}); err != nil {
pTimer.Reset(mrfTimeInterval)
cancelFn()
logger.LogIf(p.ctx, err)
continue
}
for item := range objCh {
if err := p.queueMRFHeal(item.Name); err == nil {
p.objLayer.DeleteObject(p.ctx, minioMetaBucket, item.Name, ObjectOptions{})
}
}
pTimer.Reset(mrfTimeInterval)
cancelFn()
case <-p.ctx.Done():
return
}
}
}
// process sends error logs to the heal channel for an attempt to heal replication.
func (p *ReplicationPool) queueMRFHeal(file string) error {
if !p.initialized() {
return errServerNotInitialized
}
mrfRec, err := p.loadMRF(file)
if err != nil {
return err
}
for vID, e := range mrfRec.Entries {
oi, err := p.objLayer.GetObjectInfo(p.ctx, e.Bucket, e.Object, ObjectOptions{
VersionID: vID,
})
if err != nil {
continue
}
QueueReplicationHeal(p.ctx, e.Bucket, oi)
}
return nil
}
// load replication stats from disk
func (p *ReplicationPool) loadStatsFromDisk() (rs map[string]BucketReplicationStats, e error) {
if !p.initialized() {
return map[string]BucketReplicationStats{}, nil
}
data, err := readConfig(p.ctx, p.objLayer, getReplicationStatsPath())
if err != nil {
if !errors.Is(err, errConfigNotFound) {
return rs, nil
}
return rs, err
}
if len(data) <= 4 {
logger.LogIf(p.ctx, fmt.Errorf("replication stats: no data"))
return map[string]BucketReplicationStats{}, nil
}
// Read repl stats meta header
switch binary.LittleEndian.Uint16(data[0:2]) {
case replStatsMetaFormat:
default:
return rs, fmt.Errorf("replication stats: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
}
switch binary.LittleEndian.Uint16(data[2:4]) {
case replStatsVersion:
default:
return rs, fmt.Errorf("replication stats: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
}
ss := BucketStatsMap{}
if _, err = ss.UnmarshalMsg(data[4:]); err != nil {
return rs, err
}
rs = make(map[string]BucketReplicationStats, len(ss.Stats))
for bucket, st := range ss.Stats {
rs[bucket] = st.ReplicationStats
}
return rs, nil
}
func (p *ReplicationPool) initialized() bool {
return !(p == nil || p.objLayer == nil)
}
func (p *ReplicationPool) saveStatsToDisk() {
if !p.initialized() {
return
}
ctx, cancel := globalLeaderLock.GetLock(p.ctx)
defer cancel()
sTimer := time.NewTimer(replStatsSaveInterval)
defer sTimer.Stop()
for {
select {
case <-sTimer.C:
dui, err := loadDataUsageFromBackend(GlobalContext, newObjectLayerFn())
if err == nil && !dui.LastUpdate.IsZero() {
globalReplicationStats.getAllLatest(dui.BucketsUsage)
}
p.saveStats(p.ctx)
sTimer.Reset(replStatsSaveInterval)
case <-ctx.Done():
return
}
}
}
// save replication stats to .minio.sys/buckets/replication/node-name.stats
func (p *ReplicationPool) saveStats(ctx context.Context) error {
if !p.initialized() {
return nil
}
data, err := globalReplicationStats.serializeStats()
if data == nil {
return err
}
return saveConfig(ctx, p.objLayer, getReplicationStatsPath(), data)
}