2021-04-18 15:41:13 -04:00
|
|
|
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
|
|
//
|
|
|
|
// This file is part of MinIO Object Storage stack
|
|
|
|
//
|
|
|
|
// This program is free software: you can redistribute it and/or modify
|
|
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
|
|
// (at your option) any later version.
|
|
|
|
//
|
|
|
|
// This program is distributed in the hope that it will be useful
|
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
// GNU Affero General Public License for more details.
|
|
|
|
//
|
|
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
2020-07-21 20:49:56 -04:00
|
|
|
|
|
|
|
package cmd
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2020-09-15 23:44:48 -04:00
|
|
|
"fmt"
|
2021-06-30 10:44:24 -04:00
|
|
|
"io"
|
2020-07-21 20:49:56 -04:00
|
|
|
"net/http"
|
2021-02-03 23:41:33 -05:00
|
|
|
"reflect"
|
2020-08-12 20:32:24 -04:00
|
|
|
"strings"
|
2021-03-09 05:56:42 -05:00
|
|
|
"sync"
|
2020-07-21 20:49:56 -04:00
|
|
|
"time"
|
|
|
|
|
2021-05-06 11:52:02 -04:00
|
|
|
"github.com/minio/madmin-go"
|
2021-08-23 11:16:18 -04:00
|
|
|
"github.com/minio/minio-go/v7"
|
2020-07-21 20:49:56 -04:00
|
|
|
miniogo "github.com/minio/minio-go/v7"
|
|
|
|
"github.com/minio/minio-go/v7/pkg/encrypt"
|
|
|
|
"github.com/minio/minio-go/v7/pkg/tags"
|
2021-06-01 17:59:40 -04:00
|
|
|
"github.com/minio/minio/internal/bucket/bandwidth"
|
|
|
|
"github.com/minio/minio/internal/bucket/replication"
|
|
|
|
"github.com/minio/minio/internal/config/storageclass"
|
|
|
|
"github.com/minio/minio/internal/crypto"
|
|
|
|
"github.com/minio/minio/internal/event"
|
2021-06-30 10:44:24 -04:00
|
|
|
"github.com/minio/minio/internal/hash"
|
2021-06-01 17:59:40 -04:00
|
|
|
xhttp "github.com/minio/minio/internal/http"
|
|
|
|
"github.com/minio/minio/internal/logger"
|
2020-07-21 20:49:56 -04:00
|
|
|
)
|
|
|
|
|
2021-09-18 16:31:35 -04:00
|
|
|
const (
|
|
|
|
throttleDeadline = 1 * time.Hour
|
|
|
|
// ReplicationReset has reset id and timestamp of last reset operation
|
|
|
|
ReplicationReset = "replication-reset"
|
|
|
|
// ReplicationStatus has internal replication status - stringified representation of target's replication status for all replication
|
|
|
|
// activity initiated from this cluster
|
|
|
|
ReplicationStatus = "replication-status"
|
|
|
|
// ReplicationTimestamp - the last time replication was initiated on this cluster for this object version
|
|
|
|
ReplicationTimestamp = "replication-timestamp"
|
|
|
|
// ReplicaStatus - this header is present if a replica was received by this cluster for this object version
|
|
|
|
ReplicaStatus = "replica-status"
|
|
|
|
// ReplicaTimestamp - the last time a replica was received by this cluster for this object version
|
|
|
|
ReplicaTimestamp = "replica-timestamp"
|
|
|
|
// TaggingTimestamp - the last time a tag metadata modification happened on this cluster for this object version
|
|
|
|
TaggingTimestamp = "tagging-timestamp"
|
|
|
|
// ObjectLockRetentionTimestamp - the last time a object lock metadata modification happened on this cluster for this object version
|
|
|
|
ObjectLockRetentionTimestamp = "objectlock-retention-timestamp"
|
|
|
|
// ObjectLockLegalHoldTimestamp - the last time a legal hold metadata modification happened on this cluster for this object version
|
|
|
|
ObjectLockLegalHoldTimestamp = "objectlock-legalhold-timestamp"
|
|
|
|
)
|
2021-07-28 18:20:01 -04:00
|
|
|
|
2020-07-30 22:55:22 -04:00
|
|
|
// gets replication config associated to a given bucket name.
|
|
|
|
func getReplicationConfig(ctx context.Context, bucketName string) (rc *replication.Config, err error) {
|
2020-07-21 20:49:56 -04:00
|
|
|
if globalIsGateway {
|
2020-10-09 12:59:52 -04:00
|
|
|
objAPI := newObjectLayerFn()
|
2020-07-21 20:49:56 -04:00
|
|
|
if objAPI == nil {
|
2021-09-18 16:31:35 -04:00
|
|
|
return rc, errServerNotInitialized
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
|
|
|
|
2021-09-18 16:31:35 -04:00
|
|
|
return rc, BucketReplicationConfigNotFound{Bucket: bucketName}
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
return globalBucketMetadataSys.GetReplicationConfig(ctx, bucketName)
|
|
|
|
}
|
|
|
|
|
2020-07-30 22:55:22 -04:00
|
|
|
// validateReplicationDestination returns error if replication destination bucket missing or not configured
|
2020-07-21 20:49:56 -04:00
|
|
|
// It also returns true if replication destination is same as this server.
|
2021-09-21 16:03:20 -04:00
|
|
|
func validateReplicationDestination(ctx context.Context, bucket string, rCfg *replication.Config) (bool, APIError) {
|
2021-09-18 16:31:35 -04:00
|
|
|
var arns []string
|
|
|
|
if rCfg.RoleArn != "" {
|
|
|
|
arns = append(arns, rCfg.RoleArn)
|
|
|
|
} else {
|
|
|
|
for _, rule := range rCfg.Rules {
|
|
|
|
arns = append(arns, rule.Destination.String())
|
|
|
|
}
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
for _, arnStr := range arns {
|
|
|
|
arn, err := madmin.ParseARN(arnStr)
|
|
|
|
if err != nil {
|
2021-09-21 16:03:20 -04:00
|
|
|
return false, errorCodes.ToAPIErrWithErr(ErrBucketRemoteArnInvalid, err)
|
2021-09-18 16:31:35 -04:00
|
|
|
}
|
|
|
|
if arn.Type != madmin.ReplicationService {
|
2021-09-21 16:03:20 -04:00
|
|
|
return false, toAPIError(ctx, BucketRemoteArnTypeInvalid{Bucket: bucket})
|
2021-09-18 16:31:35 -04:00
|
|
|
}
|
|
|
|
clnt := globalBucketTargetSys.GetRemoteTargetClient(ctx, arnStr)
|
|
|
|
if clnt == nil {
|
2021-09-21 16:03:20 -04:00
|
|
|
return false, toAPIError(ctx, BucketRemoteTargetNotFound{Bucket: bucket})
|
2021-09-18 16:31:35 -04:00
|
|
|
}
|
2021-09-21 16:03:20 -04:00
|
|
|
if found, err := clnt.BucketExists(ctx, arn.Bucket); !found {
|
|
|
|
return false, errorCodes.ToAPIErrWithErr(ErrRemoteDestinationNotFoundError, err)
|
2021-09-18 16:31:35 -04:00
|
|
|
}
|
|
|
|
if ret, err := globalBucketObjectLockSys.Get(bucket); err == nil {
|
|
|
|
if ret.LockEnabled {
|
|
|
|
lock, _, _, _, err := clnt.GetObjectLockConfig(ctx, arn.Bucket)
|
|
|
|
if err != nil || lock != "Enabled" {
|
2021-09-21 16:03:20 -04:00
|
|
|
return false, errorCodes.ToAPIErrWithErr(ErrReplicationDestinationMissingLock, err)
|
2021-09-18 16:31:35 -04:00
|
|
|
}
|
2020-08-05 02:02:27 -04:00
|
|
|
}
|
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
// validate replication ARN against target endpoint
|
|
|
|
c, ok := globalBucketTargetSys.arnRemotesMap[arnStr]
|
|
|
|
if ok {
|
|
|
|
if c.EndpointURL().String() == clnt.EndpointURL().String() {
|
|
|
|
sameTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort)
|
2021-09-21 16:03:20 -04:00
|
|
|
return sameTarget, toAPIError(ctx, nil)
|
2021-09-18 16:31:35 -04:00
|
|
|
}
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
|
|
|
}
|
2021-09-21 16:03:20 -04:00
|
|
|
return false, toAPIError(ctx, BucketRemoteTargetNotFound{Bucket: bucket})
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
|
|
|
|
2021-06-01 22:59:11 -04:00
|
|
|
type mustReplicateOptions struct {
|
2021-09-18 16:31:35 -04:00
|
|
|
meta map[string]string
|
|
|
|
status replication.StatusType
|
|
|
|
opType replication.Type
|
|
|
|
replicationRequest bool // incoming request is a replication request
|
2021-06-01 22:59:11 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
func (o mustReplicateOptions) ReplicationStatus() (s replication.StatusType) {
|
|
|
|
if rs, ok := o.meta[xhttp.AmzBucketReplicationStatus]; ok {
|
|
|
|
return replication.StatusType(rs)
|
|
|
|
}
|
|
|
|
return s
|
|
|
|
}
|
|
|
|
func (o mustReplicateOptions) isExistingObjectReplication() bool {
|
|
|
|
return o.opType == replication.ExistingObjectReplicationType
|
|
|
|
}
|
|
|
|
|
|
|
|
func (o mustReplicateOptions) isMetadataReplication() bool {
|
|
|
|
return o.opType == replication.MetadataReplicationType
|
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
func getMustReplicateOptions(o ObjectInfo, op replication.Type, opts ObjectOptions) mustReplicateOptions {
|
2021-06-01 22:59:11 -04:00
|
|
|
if !op.Valid() {
|
|
|
|
op = replication.ObjectReplicationType
|
|
|
|
if o.metadataOnly {
|
|
|
|
op = replication.MetadataReplicationType
|
|
|
|
}
|
|
|
|
}
|
|
|
|
meta := cloneMSS(o.UserDefined)
|
|
|
|
if o.UserTags != "" {
|
|
|
|
meta[xhttp.AmzObjectTagging] = o.UserTags
|
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
|
2021-06-01 22:59:11 -04:00
|
|
|
return mustReplicateOptions{
|
2021-09-18 16:31:35 -04:00
|
|
|
meta: meta,
|
|
|
|
status: o.ReplicationStatus,
|
|
|
|
opType: op,
|
|
|
|
replicationRequest: opts.ReplicationRequest,
|
2021-06-01 22:59:11 -04:00
|
|
|
}
|
|
|
|
}
|
2021-04-29 22:01:43 -04:00
|
|
|
|
2021-01-12 01:36:51 -05:00
|
|
|
// mustReplicate returns 2 booleans - true if object meets replication criteria and true if replication is to be done in
|
|
|
|
// a synchronous manner.
|
2021-09-18 16:31:35 -04:00
|
|
|
func mustReplicate(ctx context.Context, bucket, object string, mopts mustReplicateOptions) (dsc ReplicateDecision) {
|
2020-07-21 20:49:56 -04:00
|
|
|
if globalIsGateway {
|
2021-09-18 16:31:35 -04:00
|
|
|
return
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
|
2021-06-01 22:59:11 -04:00
|
|
|
replStatus := mopts.ReplicationStatus()
|
|
|
|
if replStatus == replication.Replica && !mopts.isMetadataReplication() {
|
2021-09-18 16:31:35 -04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if mopts.replicationRequest { // incoming replication request on target cluster
|
|
|
|
return
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
2020-07-30 22:55:22 -04:00
|
|
|
cfg, err := getReplicationConfig(ctx, bucket)
|
2020-07-21 20:49:56 -04:00
|
|
|
if err != nil {
|
2021-09-18 16:31:35 -04:00
|
|
|
return
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
|
|
|
opts := replication.ObjectOpts{
|
2021-06-01 22:59:11 -04:00
|
|
|
Name: object,
|
|
|
|
SSEC: crypto.SSEC.IsEncrypted(mopts.meta),
|
|
|
|
Replica: replStatus == replication.Replica,
|
|
|
|
ExistingObject: mopts.isExistingObjectReplication(),
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
2021-06-01 22:59:11 -04:00
|
|
|
tagStr, ok := mopts.meta[xhttp.AmzObjectTagging]
|
2020-07-21 20:49:56 -04:00
|
|
|
if ok {
|
|
|
|
opts.UserTags = tagStr
|
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
tgtArns := cfg.FilterTargetArns(opts)
|
|
|
|
for _, tgtArn := range tgtArns {
|
|
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn)
|
|
|
|
// the target online status should not be used here while deciding
|
|
|
|
// whether to replicate as the target could be temporarily down
|
|
|
|
opts.TargetArn = tgtArn
|
|
|
|
replicate := cfg.Replicate(opts)
|
|
|
|
var synchronous bool
|
|
|
|
if tgt != nil {
|
|
|
|
synchronous = tgt.replicateSync
|
|
|
|
}
|
|
|
|
dsc.Set(newReplicateTargetDecision(tgtArn, replicate, synchronous))
|
2021-01-12 01:36:51 -05:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
return dsc
|
2021-01-12 01:36:51 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
// Standard headers that needs to be extracted from User metadata.
|
|
|
|
var standardHeaders = []string{
|
2021-01-27 14:22:34 -05:00
|
|
|
xhttp.ContentType,
|
|
|
|
xhttp.CacheControl,
|
|
|
|
xhttp.ContentEncoding,
|
|
|
|
xhttp.ContentLanguage,
|
|
|
|
xhttp.ContentDisposition,
|
2021-01-12 01:36:51 -05:00
|
|
|
xhttp.AmzStorageClass,
|
|
|
|
xhttp.AmzObjectTagging,
|
|
|
|
xhttp.AmzBucketReplicationStatus,
|
2021-01-27 14:22:34 -05:00
|
|
|
xhttp.AmzObjectLockMode,
|
|
|
|
xhttp.AmzObjectLockRetainUntilDate,
|
|
|
|
xhttp.AmzObjectLockLegalHold,
|
|
|
|
xhttp.AmzTagCount,
|
|
|
|
xhttp.AmzServerSideEncryption,
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
|
|
|
|
2020-11-19 21:43:58 -05:00
|
|
|
// returns true if any of the objects being deleted qualifies for replication.
|
|
|
|
func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToDelete) bool {
|
|
|
|
c, err := getReplicationConfig(ctx, bucket)
|
|
|
|
if err != nil || c == nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
for _, obj := range objects {
|
|
|
|
if c.HasActiveRules(obj.ObjectName, true) {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2021-01-12 01:36:51 -05:00
|
|
|
// isStandardHeader returns true if header is a supported header and not a custom header
|
2021-02-03 23:41:33 -05:00
|
|
|
func isStandardHeader(matchHeaderKey string) bool {
|
|
|
|
return equals(matchHeaderKey, standardHeaders...)
|
2021-01-12 01:36:51 -05:00
|
|
|
}
|
|
|
|
|
2020-11-19 21:43:58 -05:00
|
|
|
// returns whether object version is a deletemarker and if object qualifies for replication
|
2021-09-18 16:31:35 -04:00
|
|
|
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, delOpts ObjectOptions, gerr error) (dsc ReplicateDecision) {
|
2020-11-19 21:43:58 -05:00
|
|
|
rcfg, err := getReplicationConfig(ctx, bucket)
|
|
|
|
if err != nil || rcfg == nil {
|
2021-09-18 16:31:35 -04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
// If incoming request is a replication request, it does not need to be re-replicated.
|
|
|
|
if delOpts.ReplicationRequest {
|
|
|
|
return
|
2020-11-19 21:43:58 -05:00
|
|
|
}
|
2021-02-18 19:35:37 -05:00
|
|
|
opts := replication.ObjectOpts{
|
|
|
|
Name: dobj.ObjectName,
|
|
|
|
SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined),
|
|
|
|
UserTags: oi.UserTags,
|
|
|
|
DeleteMarker: oi.DeleteMarker,
|
|
|
|
VersionID: dobj.VersionID,
|
2021-03-13 13:28:35 -05:00
|
|
|
OpType: replication.DeleteReplicationType,
|
2021-02-18 19:35:37 -05:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
tgtArns := rcfg.FilterTargetArns(opts)
|
|
|
|
if len(tgtArns) > 0 {
|
|
|
|
dsc.targetsMap = make(map[string]replicateTargetDecision, len(tgtArns))
|
|
|
|
var sync, replicate bool
|
|
|
|
for _, tgtArn := range tgtArns {
|
|
|
|
opts.TargetArn = tgtArn
|
|
|
|
replicate = rcfg.Replicate(opts)
|
|
|
|
// when incoming delete is removal of a delete marker( a.k.a versioned delete),
|
|
|
|
// GetObjectInfo returns extra information even though it returns errFileNotFound
|
|
|
|
if gerr != nil {
|
|
|
|
validReplStatus := false
|
|
|
|
switch oi.TargetReplicationStatus(tgtArn) {
|
|
|
|
case replication.Pending, replication.Completed, replication.Failed:
|
|
|
|
validReplStatus = true
|
|
|
|
}
|
|
|
|
if oi.DeleteMarker && (validReplStatus || replicate) {
|
|
|
|
dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync))
|
|
|
|
continue
|
|
|
|
} else {
|
|
|
|
// can be the case that other cluster is down and duplicate `mc rm --vid`
|
|
|
|
// is issued - this still needs to be replicated back to the other target
|
|
|
|
replicate = oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed
|
|
|
|
dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn)
|
|
|
|
// the target online status should not be used here while deciding
|
|
|
|
// whether to replicate deletes as the target could be temporarily down
|
|
|
|
tgtDsc := newReplicateTargetDecision(tgtArn, false, false)
|
|
|
|
if tgt != nil {
|
|
|
|
tgtDsc = newReplicateTargetDecision(tgtArn, replicate, tgt.replicateSync)
|
|
|
|
}
|
|
|
|
dsc.Set(tgtDsc)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return dsc
|
2020-11-19 21:43:58 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
// replicate deletes to the designated replication target if replication configuration
|
|
|
|
// has delete marker replication or delete replication (MinIO extension to allow deletes where version id
|
|
|
|
// is specified) enabled.
|
|
|
|
// Similar to bucket replication for PUT operation, soft delete (a.k.a setting delete marker) and
|
|
|
|
// permanent deletes (by specifying a version ID in the delete operation) have three states "Pending", "Complete"
|
|
|
|
// and "Failed" to mark the status of the replication of "DELETE" operation. All failed operations can
|
|
|
|
// then be retried by healing. In the case of permanent deletes, until the replication is completed on the
|
|
|
|
// target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently
|
|
|
|
// deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds
|
|
|
|
// on target.
|
2021-07-01 17:02:44 -04:00
|
|
|
func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer, trigger string) {
|
2021-09-18 16:31:35 -04:00
|
|
|
var replicationStatus replication.StatusType
|
2020-11-19 21:43:58 -05:00
|
|
|
bucket := dobj.Bucket
|
2021-02-03 23:41:33 -05:00
|
|
|
versionID := dobj.DeleteMarkerVersionID
|
|
|
|
if versionID == "" {
|
|
|
|
versionID = dobj.VersionID
|
|
|
|
}
|
|
|
|
|
2021-07-01 17:02:44 -04:00
|
|
|
defer func() {
|
2021-09-18 16:31:35 -04:00
|
|
|
replStatus := string(replicationStatus)
|
2021-07-01 17:02:44 -04:00
|
|
|
auditLogInternal(context.Background(), bucket, dobj.ObjectName, AuditLogOptions{
|
|
|
|
Trigger: trigger,
|
|
|
|
APIName: ReplicateDeleteAPI,
|
|
|
|
VersionID: versionID,
|
|
|
|
Status: replStatus,
|
|
|
|
})
|
|
|
|
}()
|
|
|
|
|
2020-11-19 21:43:58 -05:00
|
|
|
rcfg, err := getReplicationConfig(ctx, bucket)
|
|
|
|
if err != nil || rcfg == nil {
|
2021-01-27 14:22:34 -05:00
|
|
|
logger.LogIf(ctx, err)
|
2021-02-03 23:41:33 -05:00
|
|
|
sendEvent(eventArgs{
|
|
|
|
BucketName: bucket,
|
|
|
|
Object: ObjectInfo{
|
|
|
|
Bucket: bucket,
|
|
|
|
Name: dobj.ObjectName,
|
|
|
|
VersionID: versionID,
|
|
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
|
|
},
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
|
|
})
|
2020-11-19 21:43:58 -05:00
|
|
|
return
|
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
dsc, err := parseReplicateDecision(dobj.ReplicationState.ReplicateDecisionStr)
|
|
|
|
if err != nil {
|
|
|
|
logger.LogIf(ctx, err)
|
2021-02-03 23:41:33 -05:00
|
|
|
sendEvent(eventArgs{
|
|
|
|
BucketName: bucket,
|
|
|
|
Object: ObjectInfo{
|
|
|
|
Bucket: bucket,
|
|
|
|
Name: dobj.ObjectName,
|
|
|
|
VersionID: versionID,
|
|
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
|
|
},
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
|
|
})
|
2020-11-19 21:43:58 -05:00
|
|
|
return
|
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
|
2021-08-23 11:16:18 -04:00
|
|
|
// Lock the object name before starting replication operation.
|
|
|
|
// Use separate lock that doesn't collide with regular objects.
|
|
|
|
lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+dobj.ObjectName)
|
|
|
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
|
|
|
if err != nil {
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", dobj.ObjectName, bucket, rcfg.RoleArn))
|
|
|
|
sendEvent(eventArgs{
|
|
|
|
BucketName: bucket,
|
|
|
|
Object: ObjectInfo{
|
|
|
|
Bucket: bucket,
|
|
|
|
Name: dobj.ObjectName,
|
|
|
|
VersionID: versionID,
|
|
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
|
|
},
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
|
|
|
ctx = lkctx.Context()
|
|
|
|
defer lk.Unlock(lkctx.Cancel)
|
2020-11-19 21:43:58 -05:00
|
|
|
|
2021-09-18 16:31:35 -04:00
|
|
|
var wg sync.WaitGroup
|
|
|
|
var rinfos replicatedInfos
|
|
|
|
rinfos.Targets = make([]replicatedTargetInfo, len(dsc.targetsMap))
|
|
|
|
idx := -1
|
|
|
|
for tgtArn := range dsc.targetsMap {
|
|
|
|
idx++
|
|
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn)
|
|
|
|
if tgt == nil {
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn))
|
|
|
|
sendEvent(eventArgs{
|
|
|
|
BucketName: bucket,
|
|
|
|
Object: ObjectInfo{
|
|
|
|
Bucket: bucket,
|
|
|
|
Name: dobj.ObjectName,
|
|
|
|
VersionID: versionID,
|
|
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
|
|
},
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
|
|
})
|
|
|
|
continue
|
2020-11-19 21:43:58 -05:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
if tgt := dsc.targetsMap[tgtArn]; !tgt.Replicate {
|
|
|
|
continue
|
2020-11-19 21:43:58 -05:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
// if dobj.TargetArn is not empty string, this is a case of specific target being re-synced.
|
|
|
|
if dobj.TargetArn != "" && dobj.TargetArn != tgt.ARN {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
wg.Add(1)
|
|
|
|
go func(index int, tgt *TargetClient) {
|
|
|
|
defer wg.Done()
|
|
|
|
rinfo := replicateDeleteToTarget(ctx, dobj, objectAPI, tgt)
|
|
|
|
rinfos.Targets[index] = rinfo
|
|
|
|
}(idx, tgt)
|
2020-11-19 21:43:58 -05:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
wg.Wait()
|
|
|
|
|
|
|
|
replicationStatus = rinfos.ReplicationStatus()
|
|
|
|
prevStatus := dobj.DeleteMarkerReplicationStatus()
|
|
|
|
|
2021-04-03 12:03:42 -04:00
|
|
|
if dobj.VersionID != "" {
|
2021-09-18 16:31:35 -04:00
|
|
|
prevStatus = replication.StatusType(dobj.VersionPurgeStatus())
|
|
|
|
replicationStatus = replication.StatusType(rinfos.VersionPurgeStatus())
|
2021-04-03 12:03:42 -04:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
|
2021-04-04 18:34:33 -04:00
|
|
|
// to decrement pending count later.
|
2021-09-18 16:31:35 -04:00
|
|
|
for _, rinfo := range rinfos.Targets {
|
|
|
|
if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus {
|
|
|
|
globalReplicationStats.Update(dobj.Bucket, rinfo.Arn, 0, replicationStatus,
|
|
|
|
prevStatus, replication.DeleteReplicationType)
|
|
|
|
}
|
|
|
|
}
|
2021-01-25 17:04:41 -05:00
|
|
|
|
2020-11-22 02:48:50 -05:00
|
|
|
var eventName = event.ObjectReplicationComplete
|
2021-09-18 16:31:35 -04:00
|
|
|
if replicationStatus == replication.Failed {
|
2020-11-22 02:48:50 -05:00
|
|
|
eventName = event.ObjectReplicationFailed
|
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
drs := getReplicationState(rinfos, dobj.ReplicationState, dobj.VersionID)
|
2021-02-03 23:41:33 -05:00
|
|
|
dobjInfo, err := objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{
|
2021-09-18 16:31:35 -04:00
|
|
|
VersionID: versionID,
|
|
|
|
MTime: dobj.DeleteMarkerMTime.Time,
|
|
|
|
DeleteReplication: drs,
|
|
|
|
Versioned: globalBucketVersioningSys.Enabled(bucket),
|
|
|
|
VersionSuspended: globalBucketVersioningSys.Suspended(bucket),
|
2021-01-25 17:04:41 -05:00
|
|
|
})
|
2021-02-09 18:11:43 -05:00
|
|
|
if err != nil && !isErrVersionNotFound(err) { // VersionNotFound would be reported by pool that object version is missing on.
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %s", bucket, dobj.ObjectName, versionID, err))
|
2021-02-03 23:41:33 -05:00
|
|
|
sendEvent(eventArgs{
|
|
|
|
BucketName: bucket,
|
|
|
|
Object: ObjectInfo{
|
|
|
|
Bucket: bucket,
|
|
|
|
Name: dobj.ObjectName,
|
|
|
|
VersionID: versionID,
|
|
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
|
|
},
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
EventName: eventName,
|
|
|
|
})
|
|
|
|
} else {
|
|
|
|
sendEvent(eventArgs{
|
|
|
|
BucketName: bucket,
|
|
|
|
Object: dobjInfo,
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
EventName: eventName,
|
|
|
|
})
|
2020-11-19 21:43:58 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-18 16:31:35 -04:00
|
|
|
func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
|
|
|
|
versionID := dobj.DeleteMarkerVersionID
|
|
|
|
if versionID == "" {
|
|
|
|
versionID = dobj.VersionID
|
|
|
|
}
|
|
|
|
|
|
|
|
rinfo = dobj.ReplicationState.targetState(tgt.ARN)
|
|
|
|
rinfo.OpType = dobj.OpType
|
|
|
|
defer func() {
|
|
|
|
if rinfo.ReplicationStatus == replication.Completed && tgt.ResetID != "" && dobj.OpType == replication.ExistingObjectReplicationType {
|
|
|
|
rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
if dobj.VersionID == "" && rinfo.PrevReplicationStatus == replication.Completed && dobj.OpType != replication.ExistingObjectReplicationType {
|
|
|
|
rinfo.ReplicationStatus = rinfo.PrevReplicationStatus
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if dobj.VersionID != "" && rinfo.VersionPurgeStatus == Complete {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if tgt.IsOffline() {
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", dobj.Bucket, tgt.ARN))
|
|
|
|
sendEvent(eventArgs{
|
|
|
|
BucketName: dobj.Bucket,
|
|
|
|
Object: ObjectInfo{
|
|
|
|
Bucket: dobj.Bucket,
|
|
|
|
Name: dobj.ObjectName,
|
|
|
|
VersionID: dobj.VersionID,
|
|
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
|
|
},
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
|
|
})
|
|
|
|
if dobj.VersionID == "" {
|
|
|
|
rinfo.ReplicationStatus = replication.Failed
|
|
|
|
} else {
|
|
|
|
rinfo.VersionPurgeStatus = Failed
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
// early return if already replicated delete marker for existing object replication
|
|
|
|
if dobj.DeleteMarkerVersionID != "" && dobj.OpType == replication.ExistingObjectReplicationType {
|
|
|
|
if _, err := tgt.StatObject(ctx, tgt.Bucket, dobj.ObjectName, miniogo.StatObjectOptions{
|
|
|
|
VersionID: versionID,
|
|
|
|
Internal: miniogo.AdvancedGetOptions{
|
|
|
|
ReplicationProxyRequest: "false",
|
|
|
|
}}); isErrMethodNotAllowed(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)) {
|
|
|
|
if dobj.VersionID == "" {
|
|
|
|
rinfo.ReplicationStatus = replication.Completed
|
|
|
|
} else {
|
|
|
|
rinfo.VersionPurgeStatus = Complete
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
rmErr := tgt.RemoveObject(ctx, tgt.Bucket, dobj.ObjectName, miniogo.RemoveObjectOptions{
|
|
|
|
VersionID: versionID,
|
|
|
|
Internal: miniogo.AdvancedRemoveOptions{
|
|
|
|
ReplicationDeleteMarker: dobj.DeleteMarkerVersionID != "",
|
|
|
|
ReplicationMTime: dobj.DeleteMarkerMTime.Time,
|
|
|
|
ReplicationStatus: miniogo.ReplicationStatusReplica,
|
|
|
|
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
|
|
|
|
},
|
|
|
|
})
|
|
|
|
if rmErr != nil {
|
|
|
|
if dobj.VersionID == "" {
|
|
|
|
rinfo.ReplicationStatus = replication.Failed
|
|
|
|
} else {
|
|
|
|
rinfo.VersionPurgeStatus = Failed
|
|
|
|
}
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate delete marker to %s/%s(%s): %s", tgt.Bucket, dobj.ObjectName, versionID, rmErr))
|
|
|
|
} else {
|
|
|
|
if dobj.VersionID == "" {
|
|
|
|
rinfo.ReplicationStatus = replication.Completed
|
|
|
|
} else {
|
|
|
|
rinfo.VersionPurgeStatus = Complete
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func getCopyObjMetadata(oi ObjectInfo, sc string) map[string]string {
|
2020-11-19 14:50:22 -05:00
|
|
|
meta := make(map[string]string, len(oi.UserDefined))
|
|
|
|
for k, v := range oi.UserDefined {
|
2021-02-03 23:41:33 -05:00
|
|
|
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
|
2020-11-19 14:50:22 -05:00
|
|
|
continue
|
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
|
|
|
|
if equals(k, xhttp.AmzBucketReplicationStatus) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w
|
|
|
|
if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) {
|
2020-11-19 14:50:22 -05:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
meta[k] = v
|
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
|
2020-11-19 14:50:22 -05:00
|
|
|
if oi.ContentEncoding != "" {
|
|
|
|
meta[xhttp.ContentEncoding] = oi.ContentEncoding
|
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
|
2020-11-19 14:50:22 -05:00
|
|
|
if oi.ContentType != "" {
|
|
|
|
meta[xhttp.ContentType] = oi.ContentType
|
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
|
|
|
|
if oi.UserTags != "" {
|
|
|
|
meta[xhttp.AmzObjectTagging] = oi.UserTags
|
2020-11-19 14:50:22 -05:00
|
|
|
meta[xhttp.AmzTagDirective] = "REPLACE"
|
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
|
2020-11-19 14:50:22 -05:00
|
|
|
if sc == "" {
|
|
|
|
sc = oi.StorageClass
|
|
|
|
}
|
2021-04-19 13:30:42 -04:00
|
|
|
// drop non standard storage classes for tiering from replication
|
|
|
|
if sc != "" && (sc == storageclass.RRS || sc == storageclass.STANDARD) {
|
2021-02-03 23:41:33 -05:00
|
|
|
meta[xhttp.AmzStorageClass] = sc
|
2020-11-19 14:50:22 -05:00
|
|
|
}
|
2021-04-19 13:30:42 -04:00
|
|
|
|
2020-11-19 14:50:22 -05:00
|
|
|
meta[xhttp.MinIOSourceETag] = oi.ETag
|
2021-02-03 23:41:33 -05:00
|
|
|
meta[xhttp.MinIOSourceMTime] = oi.ModTime.Format(time.RFC3339Nano)
|
2020-11-19 14:50:22 -05:00
|
|
|
meta[xhttp.AmzBucketReplicationStatus] = replication.Replica.String()
|
|
|
|
return meta
|
|
|
|
}
|
|
|
|
|
2021-02-08 21:12:28 -05:00
|
|
|
type caseInsensitiveMap map[string]string
|
|
|
|
|
|
|
|
// Lookup map entry case insensitively.
|
|
|
|
func (m caseInsensitiveMap) Lookup(key string) (string, bool) {
|
|
|
|
if len(m) == 0 {
|
|
|
|
return "", false
|
|
|
|
}
|
|
|
|
for _, k := range []string{
|
|
|
|
key,
|
|
|
|
strings.ToLower(key),
|
|
|
|
http.CanonicalHeaderKey(key),
|
|
|
|
} {
|
|
|
|
v, ok := m[k]
|
|
|
|
if ok {
|
|
|
|
return v, ok
|
|
|
|
}
|
2021-02-08 19:19:05 -05:00
|
|
|
}
|
2021-02-08 21:12:28 -05:00
|
|
|
return "", false
|
2021-02-08 19:19:05 -05:00
|
|
|
}
|
|
|
|
|
2021-09-18 16:31:35 -04:00
|
|
|
func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, err error) {
|
2020-07-21 20:49:56 -04:00
|
|
|
meta := make(map[string]string)
|
|
|
|
for k, v := range objInfo.UserDefined {
|
2021-01-12 01:36:51 -05:00
|
|
|
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
|
2020-07-21 20:49:56 -04:00
|
|
|
continue
|
|
|
|
}
|
2021-01-12 01:36:51 -05:00
|
|
|
if isStandardHeader(k) {
|
2020-08-12 20:32:24 -04:00
|
|
|
continue
|
|
|
|
}
|
2020-07-21 20:49:56 -04:00
|
|
|
meta[k] = v
|
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
|
2021-04-19 13:30:42 -04:00
|
|
|
if sc == "" && (objInfo.StorageClass == storageclass.STANDARD || objInfo.StorageClass == storageclass.RRS) {
|
2020-08-05 23:01:20 -04:00
|
|
|
sc = objInfo.StorageClass
|
|
|
|
}
|
2020-07-21 20:49:56 -04:00
|
|
|
putOpts = miniogo.PutObjectOptions{
|
2020-10-06 11:37:09 -04:00
|
|
|
UserMetadata: meta,
|
|
|
|
ContentType: objInfo.ContentType,
|
|
|
|
ContentEncoding: objInfo.ContentEncoding,
|
|
|
|
StorageClass: sc,
|
|
|
|
Internal: miniogo.AdvancedPutOptions{
|
2021-03-03 14:13:31 -05:00
|
|
|
SourceVersionID: objInfo.VersionID,
|
|
|
|
ReplicationStatus: miniogo.ReplicationStatusReplica,
|
|
|
|
SourceMTime: objInfo.ModTime,
|
|
|
|
SourceETag: objInfo.ETag,
|
|
|
|
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
|
2020-10-06 11:37:09 -04:00
|
|
|
},
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
if objInfo.UserTags != "" {
|
|
|
|
tag, _ := tags.ParseObjectTags(objInfo.UserTags)
|
|
|
|
if tag != nil {
|
|
|
|
putOpts.UserTags = tag.ToMap()
|
2021-09-18 16:31:35 -04:00
|
|
|
// set tag timestamp in opts
|
|
|
|
tagTimestamp := objInfo.ModTime
|
|
|
|
if tagTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp]; ok {
|
|
|
|
tagTimestamp, err = time.Parse(time.RFC3339Nano, tagTmstampStr)
|
|
|
|
if err != nil {
|
|
|
|
return putOpts, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
putOpts.Internal.TaggingTimestamp = tagTimestamp
|
2021-02-03 23:41:33 -05:00
|
|
|
}
|
|
|
|
}
|
2021-02-08 21:12:28 -05:00
|
|
|
|
|
|
|
lkMap := caseInsensitiveMap(objInfo.UserDefined)
|
|
|
|
if lang, ok := lkMap.Lookup(xhttp.ContentLanguage); ok {
|
2021-01-27 14:22:34 -05:00
|
|
|
putOpts.ContentLanguage = lang
|
|
|
|
}
|
2021-02-08 21:12:28 -05:00
|
|
|
if disp, ok := lkMap.Lookup(xhttp.ContentDisposition); ok {
|
2021-01-27 14:22:34 -05:00
|
|
|
putOpts.ContentDisposition = disp
|
|
|
|
}
|
2021-02-08 21:12:28 -05:00
|
|
|
if cc, ok := lkMap.Lookup(xhttp.CacheControl); ok {
|
2021-01-27 14:22:34 -05:00
|
|
|
putOpts.CacheControl = cc
|
|
|
|
}
|
2021-02-08 21:12:28 -05:00
|
|
|
if mode, ok := lkMap.Lookup(xhttp.AmzObjectLockMode); ok {
|
2020-07-21 20:49:56 -04:00
|
|
|
rmode := miniogo.RetentionMode(mode)
|
|
|
|
putOpts.Mode = rmode
|
|
|
|
}
|
2021-02-08 21:12:28 -05:00
|
|
|
if retainDateStr, ok := lkMap.Lookup(xhttp.AmzObjectLockRetainUntilDate); ok {
|
|
|
|
rdate, err := time.Parse(time.RFC3339, retainDateStr)
|
2020-07-21 20:49:56 -04:00
|
|
|
if err != nil {
|
2021-02-08 21:12:28 -05:00
|
|
|
return putOpts, err
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
|
|
|
putOpts.RetainUntilDate = rdate
|
2021-09-18 16:31:35 -04:00
|
|
|
// set retention timestamp in opts
|
|
|
|
retTimestamp := objInfo.ModTime
|
|
|
|
if retainTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp]; ok {
|
|
|
|
retTimestamp, err = time.Parse(time.RFC3339Nano, retainTmstampStr)
|
|
|
|
if err != nil {
|
|
|
|
return putOpts, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
putOpts.Internal.RetentionTimestamp = retTimestamp
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
2021-02-08 21:12:28 -05:00
|
|
|
if lhold, ok := lkMap.Lookup(xhttp.AmzObjectLockLegalHold); ok {
|
2020-07-21 20:49:56 -04:00
|
|
|
putOpts.LegalHold = miniogo.LegalHoldStatus(lhold)
|
2021-09-18 16:31:35 -04:00
|
|
|
// set legalhold timestamp in opts
|
|
|
|
lholdTimestamp := objInfo.ModTime
|
|
|
|
if lholdTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp]; ok {
|
|
|
|
lholdTimestamp, err = time.Parse(time.RFC3339Nano, lholdTmstampStr)
|
|
|
|
if err != nil {
|
|
|
|
return putOpts, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
putOpts.Internal.LegalholdTimestamp = lholdTimestamp
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
|
|
|
if crypto.S3.IsEncrypted(objInfo.UserDefined) {
|
|
|
|
putOpts.ServerSideEncryption = encrypt.NewSSE()
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-11-19 14:50:22 -05:00
|
|
|
type replicationAction string
|
|
|
|
|
|
|
|
const (
|
|
|
|
replicateMetadata replicationAction = "metadata"
|
|
|
|
replicateNone replicationAction = "none"
|
|
|
|
replicateAll replicationAction = "all"
|
|
|
|
)
|
|
|
|
|
2021-02-03 23:41:33 -05:00
|
|
|
// matches k1 with all keys, returns 'true' if one of them matches
|
|
|
|
func equals(k1 string, keys ...string) bool {
|
|
|
|
for _, k2 := range keys {
|
2021-09-18 16:31:35 -04:00
|
|
|
if strings.EqualFold(strings.ToLower(k1), strings.ToLower(k2)) {
|
2021-02-03 23:41:33 -05:00
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2020-11-19 14:50:22 -05:00
|
|
|
// returns replicationAction by comparing metadata between source and target
|
|
|
|
func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationAction {
|
|
|
|
// needs full replication
|
|
|
|
if oi1.ETag != oi2.ETag ||
|
|
|
|
oi1.VersionID != oi2.VersionID ||
|
|
|
|
oi1.Size != oi2.Size ||
|
2021-01-27 14:22:34 -05:00
|
|
|
oi1.DeleteMarker != oi2.IsDeleteMarker ||
|
2021-02-03 23:41:33 -05:00
|
|
|
oi1.ModTime.Unix() != oi2.LastModified.Unix() {
|
2020-11-19 14:50:22 -05:00
|
|
|
return replicateAll
|
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
|
2021-01-27 14:22:34 -05:00
|
|
|
if oi1.ContentType != oi2.ContentType {
|
2020-11-19 14:50:22 -05:00
|
|
|
return replicateMetadata
|
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
|
2020-11-19 14:50:22 -05:00
|
|
|
if oi1.ContentEncoding != "" {
|
2021-01-27 14:22:34 -05:00
|
|
|
enc, ok := oi2.Metadata[xhttp.ContentEncoding]
|
2021-02-03 23:41:33 -05:00
|
|
|
if !ok {
|
|
|
|
enc, ok = oi2.Metadata[strings.ToLower(xhttp.ContentEncoding)]
|
|
|
|
if !ok {
|
|
|
|
return replicateMetadata
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if strings.Join(enc, ",") != oi1.ContentEncoding {
|
2020-11-19 14:50:22 -05:00
|
|
|
return replicateMetadata
|
|
|
|
}
|
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
|
|
|
|
t, _ := tags.ParseObjectTags(oi1.UserTags)
|
|
|
|
if !reflect.DeepEqual(oi2.UserTags, t.ToMap()) {
|
2020-11-19 14:50:22 -05:00
|
|
|
return replicateMetadata
|
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
|
|
|
|
// Compare only necessary headers
|
|
|
|
compareKeys := []string{
|
|
|
|
"Expires",
|
|
|
|
"Cache-Control",
|
|
|
|
"Content-Language",
|
|
|
|
"Content-Disposition",
|
|
|
|
"X-Amz-Object-Lock-Mode",
|
|
|
|
"X-Amz-Object-Lock-Retain-Until-Date",
|
|
|
|
"X-Amz-Object-Lock-Legal-Hold",
|
|
|
|
"X-Amz-Website-Redirect-Location",
|
|
|
|
"X-Amz-Meta-",
|
|
|
|
}
|
|
|
|
|
|
|
|
// compare metadata on both maps to see if meta is identical
|
|
|
|
compareMeta1 := make(map[string]string)
|
|
|
|
for k, v := range oi1.UserDefined {
|
|
|
|
var found bool
|
|
|
|
for _, prefix := range compareKeys {
|
|
|
|
if !strings.HasPrefix(strings.ToLower(k), strings.ToLower(prefix)) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
found = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
if found {
|
|
|
|
compareMeta1[strings.ToLower(k)] = v
|
2021-01-27 14:22:34 -05:00
|
|
|
}
|
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
|
|
|
|
compareMeta2 := make(map[string]string)
|
|
|
|
for k, v := range oi2.Metadata {
|
|
|
|
var found bool
|
|
|
|
for _, prefix := range compareKeys {
|
|
|
|
if !strings.HasPrefix(strings.ToLower(k), strings.ToLower(prefix)) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
found = true
|
|
|
|
break
|
2021-01-27 14:22:34 -05:00
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
if found {
|
|
|
|
compareMeta2[strings.ToLower(k)] = strings.Join(v, ",")
|
2020-11-19 14:50:22 -05:00
|
|
|
}
|
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
|
|
|
|
if !reflect.DeepEqual(compareMeta1, compareMeta2) {
|
2020-11-19 14:50:22 -05:00
|
|
|
return replicateMetadata
|
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
|
2020-11-19 14:50:22 -05:00
|
|
|
return replicateNone
|
|
|
|
}
|
|
|
|
|
2020-07-21 20:49:56 -04:00
|
|
|
// replicateObject replicates the specified version of the object to destination bucket
|
|
|
|
// The source object is then updated to reflect the replication status.
|
2021-07-01 17:02:44 -04:00
|
|
|
func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, trigger string) {
|
|
|
|
var replicationStatus replication.StatusType
|
|
|
|
defer func() {
|
|
|
|
if replicationStatus.Empty() {
|
|
|
|
// replication status is empty means
|
|
|
|
// replication was not attempted for some
|
|
|
|
// reason, notify the state of the object
|
|
|
|
// on disk.
|
|
|
|
replicationStatus = ri.ReplicationStatus
|
|
|
|
}
|
|
|
|
auditLogInternal(ctx, ri.Bucket, ri.Name, AuditLogOptions{
|
|
|
|
Trigger: trigger,
|
|
|
|
APIName: ReplicateObjectAPI,
|
|
|
|
VersionID: ri.VersionID,
|
|
|
|
Status: replicationStatus.String(),
|
|
|
|
})
|
|
|
|
}()
|
|
|
|
|
2021-04-15 19:32:00 -04:00
|
|
|
objInfo := ri.ObjectInfo
|
2020-09-16 19:04:55 -04:00
|
|
|
bucket := objInfo.Bucket
|
|
|
|
object := objInfo.Name
|
|
|
|
|
2020-07-30 22:55:22 -04:00
|
|
|
cfg, err := getReplicationConfig(ctx, bucket)
|
2020-07-21 20:49:56 -04:00
|
|
|
if err != nil {
|
|
|
|
logger.LogIf(ctx, err)
|
2021-02-03 23:41:33 -05:00
|
|
|
sendEvent(eventArgs{
|
|
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
|
|
BucketName: bucket,
|
|
|
|
Object: objInfo,
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
})
|
2020-07-21 20:49:56 -04:00
|
|
|
return
|
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
tgtArns := cfg.FilterTargetArns(replication.ObjectOpts{
|
|
|
|
Name: object,
|
|
|
|
SSEC: crypto.SSEC.IsEncrypted(objInfo.UserDefined),
|
|
|
|
})
|
|
|
|
// Lock the object name before starting replication.
|
|
|
|
// Use separate lock that doesn't collide with regular objects.
|
|
|
|
lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+object)
|
|
|
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
|
|
|
if err != nil {
|
2021-02-03 23:41:33 -05:00
|
|
|
sendEvent(eventArgs{
|
|
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
|
|
BucketName: bucket,
|
|
|
|
Object: objInfo,
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
})
|
2021-09-18 16:31:35 -04:00
|
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", object, bucket, cfg.RoleArn))
|
2020-07-21 20:49:56 -04:00
|
|
|
return
|
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
ctx = lkctx.Context()
|
|
|
|
defer lk.Unlock(lkctx.Cancel)
|
|
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
var rinfos replicatedInfos
|
|
|
|
rinfos.Targets = make([]replicatedTargetInfo, len(tgtArns))
|
|
|
|
for i, tgtArn := range tgtArns {
|
|
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn)
|
|
|
|
if tgt == nil {
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn))
|
|
|
|
sendEvent(eventArgs{
|
|
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
|
|
BucketName: bucket,
|
|
|
|
Object: objInfo,
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
})
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
wg.Add(1)
|
|
|
|
go func(index int, tgt *TargetClient) {
|
|
|
|
defer wg.Done()
|
|
|
|
rinfos.Targets[index] = replicateObjectToTarget(ctx, ri, objectAPI, tgt)
|
|
|
|
}(i, tgt)
|
|
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// FIXME: add support for missing replication events
|
|
|
|
// - event.ObjectReplicationMissedThreshold
|
|
|
|
// - event.ObjectReplicationReplicatedAfterThreshold
|
|
|
|
var eventName = event.ObjectReplicationComplete
|
|
|
|
if rinfos.ReplicationStatus() == replication.Failed {
|
|
|
|
eventName = event.ObjectReplicationFailed
|
|
|
|
}
|
|
|
|
newReplStatusInternal := rinfos.ReplicationStatusInternal()
|
|
|
|
// Note that internal replication status(es) may match for previously replicated objects - in such cases
|
|
|
|
// metadata should be updated with last resync timestamp.
|
|
|
|
if objInfo.ReplicationStatusInternal != newReplStatusInternal || rinfos.ReplicationResynced() {
|
|
|
|
popts := ObjectOptions{
|
|
|
|
MTime: objInfo.ModTime,
|
|
|
|
VersionID: objInfo.VersionID,
|
|
|
|
UserDefined: make(map[string]string, len(objInfo.UserDefined)),
|
|
|
|
}
|
|
|
|
for k, v := range objInfo.UserDefined {
|
|
|
|
popts.UserDefined[k] = v
|
|
|
|
}
|
|
|
|
popts.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = newReplStatusInternal
|
|
|
|
popts.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
|
|
|
|
popts.UserDefined[xhttp.AmzBucketReplicationStatus] = string(rinfos.ReplicationStatus())
|
|
|
|
for _, rinfo := range rinfos.Targets {
|
|
|
|
if rinfo.ResyncTimestamp != "" {
|
|
|
|
popts.UserDefined[targetResetHeader(rinfo.Arn)] = rinfo.ResyncTimestamp
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if objInfo.UserTags != "" {
|
|
|
|
popts.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
|
|
|
|
}
|
|
|
|
if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil {
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w",
|
|
|
|
bucket, objInfo.Name, objInfo.VersionID, err))
|
|
|
|
}
|
|
|
|
opType := replication.MetadataReplicationType
|
|
|
|
if rinfos.Action() == replicateAll {
|
|
|
|
opType = replication.ObjectReplicationType
|
|
|
|
}
|
|
|
|
for _, rinfo := range rinfos.Targets {
|
|
|
|
if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus {
|
|
|
|
globalReplicationStats.Update(bucket, rinfo.Arn, rinfo.Size, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus, opType)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
sendEvent(eventArgs{
|
|
|
|
EventName: eventName,
|
|
|
|
BucketName: bucket,
|
|
|
|
Object: objInfo,
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
})
|
|
|
|
|
|
|
|
// re-queue failures once more - keep a retry count to avoid flooding the queue if
|
|
|
|
// the target site is down. Leave it to scanner to catch up instead.
|
|
|
|
if rinfos.ReplicationStatus() != replication.Completed && ri.RetryCount < 1 {
|
|
|
|
ri.OpType = replication.HealReplicationType
|
|
|
|
ri.ReplicationStatusInternal = rinfos.ReplicationStatusInternal()
|
|
|
|
ri.RetryCount++
|
|
|
|
globalReplicationPool.queueReplicaFailedTask(ri)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// replicateObjectToTarget replicates the specified version of the object to destination bucket
|
|
|
|
// The source object is then updated to reflect the replication status.
|
|
|
|
func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
|
2021-09-22 13:48:45 -04:00
|
|
|
objInfo := ri.ObjectInfo.Clone()
|
2021-09-18 16:31:35 -04:00
|
|
|
bucket := objInfo.Bucket
|
|
|
|
object := objInfo.Name
|
|
|
|
var (
|
|
|
|
closeOnDefer bool
|
|
|
|
gr *GetObjectReader
|
|
|
|
size int64
|
|
|
|
err error
|
|
|
|
)
|
|
|
|
sz, _ := objInfo.GetActualSize()
|
|
|
|
// set defaults for replication action based on operation being performed - actual
|
|
|
|
// replication action can only be determined after stat on remote. This default is
|
|
|
|
// needed for updating replication metrics correctly when target is offline.
|
|
|
|
var rAction replicationAction
|
|
|
|
switch ri.OpType {
|
|
|
|
case replication.MetadataReplicationType:
|
|
|
|
rAction = replicateMetadata
|
|
|
|
default:
|
|
|
|
rAction = replicateAll
|
|
|
|
}
|
|
|
|
rinfo = replicatedTargetInfo{
|
|
|
|
Size: sz,
|
|
|
|
Arn: tgt.ARN,
|
|
|
|
PrevReplicationStatus: objInfo.TargetReplicationStatus(tgt.ARN),
|
|
|
|
ReplicationStatus: replication.Failed,
|
|
|
|
OpType: ri.OpType,
|
|
|
|
ReplicationAction: rAction,
|
|
|
|
}
|
|
|
|
if ri.ObjectInfo.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) {
|
|
|
|
rinfo.ReplicationStatus = replication.Completed
|
|
|
|
rinfo.ReplicationResynced = true
|
2021-09-08 18:34:50 -04:00
|
|
|
return
|
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
if tgt.IsOffline() {
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, tgt.ARN))
|
2021-08-23 11:16:18 -04:00
|
|
|
sendEvent(eventArgs{
|
|
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
|
|
BucketName: bucket,
|
|
|
|
Object: objInfo,
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-09-18 16:31:35 -04:00
|
|
|
gr, err = objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
|
2020-09-16 19:04:55 -04:00
|
|
|
VersionID: objInfo.VersionID,
|
2020-09-15 23:44:48 -04:00
|
|
|
})
|
2020-07-21 20:49:56 -04:00
|
|
|
if err != nil {
|
2021-02-03 23:41:33 -05:00
|
|
|
sendEvent(eventArgs{
|
|
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
|
|
BucketName: bucket,
|
|
|
|
Object: objInfo,
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
})
|
2021-04-03 12:03:42 -04:00
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replicate for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err))
|
2020-07-21 20:49:56 -04:00
|
|
|
return
|
|
|
|
}
|
2021-06-29 02:58:08 -04:00
|
|
|
defer func() {
|
|
|
|
if closeOnDefer {
|
|
|
|
gr.Close()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
closeOnDefer = true
|
2021-02-03 23:41:33 -05:00
|
|
|
|
2020-09-16 19:04:55 -04:00
|
|
|
objInfo = gr.ObjInfo
|
2021-09-18 16:31:35 -04:00
|
|
|
size, err = objInfo.GetActualSize()
|
2020-07-21 20:49:56 -04:00
|
|
|
if err != nil {
|
|
|
|
logger.LogIf(ctx, err)
|
2021-02-03 23:41:33 -05:00
|
|
|
sendEvent(eventArgs{
|
|
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
|
|
BucketName: bucket,
|
|
|
|
Object: objInfo,
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
})
|
2020-07-21 20:49:56 -04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-09-18 16:31:35 -04:00
|
|
|
if tgt.Bucket == "" {
|
2021-02-03 23:41:33 -05:00
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate object %s(%s), bucket is empty", objInfo.Name, objInfo.VersionID))
|
|
|
|
sendEvent(eventArgs{
|
|
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
|
|
BucketName: bucket,
|
|
|
|
Object: objInfo,
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
})
|
2021-09-18 16:31:35 -04:00
|
|
|
return rinfo
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
2020-09-16 19:04:55 -04:00
|
|
|
|
2021-09-18 16:31:35 -04:00
|
|
|
rAction = replicateAll
|
|
|
|
oi, cerr := tgt.StatObject(ctx, tgt.Bucket, object, miniogo.StatObjectOptions{
|
2021-01-27 14:22:34 -05:00
|
|
|
VersionID: objInfo.VersionID,
|
|
|
|
Internal: miniogo.AdvancedGetOptions{
|
|
|
|
ReplicationProxyRequest: "false",
|
|
|
|
}})
|
2021-09-18 16:31:35 -04:00
|
|
|
if cerr == nil {
|
|
|
|
rAction = getReplicationAction(objInfo, oi)
|
|
|
|
rinfo.ReplicationStatus = replication.Completed
|
|
|
|
if rAction == replicateNone {
|
2020-07-21 20:49:56 -04:00
|
|
|
// object with same VersionID already exists, replication kicked off by
|
2021-04-03 12:03:42 -04:00
|
|
|
// PutObject might have completed
|
2021-09-18 16:31:35 -04:00
|
|
|
if objInfo.TargetReplicationStatus(tgt.ARN) == replication.Pending || objInfo.TargetReplicationStatus(tgt.ARN) == replication.Failed || ri.OpType == replication.ExistingObjectReplicationType {
|
2021-07-01 17:02:44 -04:00
|
|
|
// if metadata is not updated for some reason after replication, such as
|
|
|
|
// 503 encountered while updating metadata - make sure to set ReplicationStatus
|
|
|
|
// as Completed.
|
|
|
|
//
|
|
|
|
// Note: Replication Stats would have been updated despite metadata update failure.
|
2021-06-29 02:58:08 -04:00
|
|
|
gr.Close()
|
|
|
|
closeOnDefer = false
|
2021-09-18 16:31:35 -04:00
|
|
|
return replicatedTargetInfo{
|
|
|
|
ReplicationStatus: replication.Completed,
|
|
|
|
Size: sz,
|
|
|
|
Arn: tgt.ARN,
|
|
|
|
ReplicationAction: rAction,
|
|
|
|
PrevReplicationStatus: objInfo.TargetReplicationStatus(tgt.ARN),
|
2021-04-29 19:46:26 -04:00
|
|
|
}
|
|
|
|
}
|
2020-07-21 20:49:56 -04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
rinfo.ReplicationStatus = replication.Completed
|
|
|
|
rinfo.Size = size
|
|
|
|
rinfo.ReplicationAction = rAction
|
|
|
|
defer func() {
|
|
|
|
if rinfo.ReplicationStatus == replication.Completed && ri.OpType == replication.ExistingObjectReplicationType && tgt.ResetID != "" {
|
|
|
|
rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
|
|
|
|
rinfo.ReplicationResynced = true
|
|
|
|
}
|
|
|
|
}()
|
2021-02-20 03:22:17 -05:00
|
|
|
// use core client to avoid doing multipart on PUT
|
|
|
|
c := &miniogo.Core{Client: tgt.Client}
|
2021-09-18 16:31:35 -04:00
|
|
|
if rAction != replicateAll {
|
2020-11-19 14:50:22 -05:00
|
|
|
// replicate metadata for object tagging/copy with metadata replacement
|
2021-02-10 20:25:04 -05:00
|
|
|
srcOpts := miniogo.CopySrcOptions{
|
2021-09-18 16:31:35 -04:00
|
|
|
Bucket: tgt.Bucket,
|
2021-02-10 20:25:04 -05:00
|
|
|
Object: object,
|
2021-04-03 12:03:42 -04:00
|
|
|
VersionID: objInfo.VersionID,
|
|
|
|
}
|
2021-03-03 14:13:31 -05:00
|
|
|
dstOpts := miniogo.PutObjectOptions{
|
|
|
|
Internal: miniogo.AdvancedPutOptions{
|
|
|
|
SourceVersionID: objInfo.VersionID,
|
|
|
|
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
|
|
|
|
}}
|
2021-09-18 16:31:35 -04:00
|
|
|
if _, err = c.CopyObject(ctx, tgt.Bucket, object, tgt.Bucket, object, getCopyObjMetadata(objInfo, tgt.StorageClass), srcOpts, dstOpts); err != nil {
|
|
|
|
rinfo.ReplicationStatus = replication.Failed
|
2021-02-09 18:11:43 -05:00
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
2021-01-06 19:13:10 -05:00
|
|
|
}
|
|
|
|
} else {
|
2021-09-18 16:31:35 -04:00
|
|
|
var putOpts minio.PutObjectOptions
|
|
|
|
putOpts, err = putReplicationOpts(ctx, tgt.StorageClass, objInfo)
|
2021-02-08 19:19:05 -05:00
|
|
|
if err != nil {
|
2021-09-18 16:31:35 -04:00
|
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s err:%w", bucket, err))
|
2021-02-08 21:12:28 -05:00
|
|
|
sendEvent(eventArgs{
|
|
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
|
|
BucketName: bucket,
|
|
|
|
Object: objInfo,
|
|
|
|
Host: "Internal: [Replication]",
|
|
|
|
})
|
2021-02-08 19:19:05 -05:00
|
|
|
return
|
|
|
|
}
|
2021-01-06 19:13:10 -05:00
|
|
|
var headerSize int
|
|
|
|
for k, v := range putOpts.Header() {
|
|
|
|
headerSize += len(k) + len(v)
|
|
|
|
}
|
2021-01-08 13:12:26 -05:00
|
|
|
|
2021-04-05 19:07:53 -04:00
|
|
|
opts := &bandwidth.MonitorReaderOptions{
|
2021-06-24 21:29:30 -04:00
|
|
|
Bucket: objInfo.Bucket,
|
|
|
|
HeaderSize: headerSize,
|
2021-04-05 19:07:53 -04:00
|
|
|
}
|
2021-07-28 18:20:01 -04:00
|
|
|
newCtx := ctx
|
|
|
|
if globalBucketMonitor.IsThrottled(bucket) {
|
|
|
|
var cancel context.CancelFunc
|
|
|
|
newCtx, cancel = context.WithTimeout(ctx, throttleDeadline)
|
|
|
|
defer cancel()
|
|
|
|
}
|
2021-06-24 21:29:30 -04:00
|
|
|
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
|
2021-09-09 01:25:23 -04:00
|
|
|
if objInfo.isMultipart() {
|
2021-09-18 16:31:35 -04:00
|
|
|
if err := replicateObjectWithMultipart(ctx, c, tgt.Bucket, object,
|
2021-07-29 01:11:55 -04:00
|
|
|
r, objInfo, putOpts); err != nil {
|
2021-09-18 16:31:35 -04:00
|
|
|
rinfo.ReplicationStatus = replication.Failed
|
2021-06-30 10:44:24 -04:00
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
|
|
|
}
|
|
|
|
} else {
|
2021-09-18 16:31:35 -04:00
|
|
|
if _, err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts); err != nil {
|
|
|
|
rinfo.ReplicationStatus = replication.Failed
|
2021-06-30 10:44:24 -04:00
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
|
|
|
}
|
2021-01-06 19:13:10 -05:00
|
|
|
}
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
2021-06-29 02:58:08 -04:00
|
|
|
gr.Close()
|
|
|
|
closeOnDefer = false
|
2021-09-18 16:31:35 -04:00
|
|
|
return
|
2020-07-21 20:49:56 -04:00
|
|
|
}
|
2020-08-12 20:32:24 -04:00
|
|
|
|
2021-07-29 01:11:55 -04:00
|
|
|
func replicateObjectWithMultipart(ctx context.Context, c *miniogo.Core, bucket, object string, r io.Reader, objInfo ObjectInfo, opts miniogo.PutObjectOptions) (err error) {
|
2021-06-30 10:44:24 -04:00
|
|
|
var uploadedParts []miniogo.CompletePart
|
2021-07-29 01:11:55 -04:00
|
|
|
uploadID, err := c.NewMultipartUpload(context.Background(), bucket, object, opts)
|
2021-06-30 10:44:24 -04:00
|
|
|
if err != nil {
|
2021-07-29 01:11:55 -04:00
|
|
|
return err
|
2021-06-30 10:44:24 -04:00
|
|
|
}
|
2021-07-29 01:11:55 -04:00
|
|
|
|
|
|
|
defer func() {
|
|
|
|
if err != nil {
|
|
|
|
// block and abort remote upload upon failure.
|
|
|
|
if aerr := c.AbortMultipartUpload(ctx, bucket, object, uploadID); aerr != nil {
|
|
|
|
aerr = fmt.Errorf("Unable to cleanup failed multipart replication %s on remote %s/%s: %w", uploadID, bucket, object, aerr)
|
|
|
|
logger.LogIf(ctx, aerr)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2021-06-30 10:44:24 -04:00
|
|
|
var (
|
|
|
|
hr *hash.Reader
|
|
|
|
pInfo miniogo.ObjectPart
|
|
|
|
)
|
2021-07-29 01:11:55 -04:00
|
|
|
|
2021-06-30 10:44:24 -04:00
|
|
|
for _, partInfo := range objInfo.Parts {
|
2021-08-24 17:41:05 -04:00
|
|
|
hr, err = hash.NewReader(r, partInfo.ActualSize, "", "", partInfo.ActualSize)
|
2021-06-30 10:44:24 -04:00
|
|
|
if err != nil {
|
2021-07-29 01:11:55 -04:00
|
|
|
return err
|
2021-06-30 10:44:24 -04:00
|
|
|
}
|
2021-08-24 17:41:05 -04:00
|
|
|
pInfo, err = c.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, hr, partInfo.ActualSize, "", "", opts.ServerSideEncryption)
|
2021-06-30 10:44:24 -04:00
|
|
|
if err != nil {
|
2021-07-29 01:11:55 -04:00
|
|
|
return err
|
2021-06-30 10:44:24 -04:00
|
|
|
}
|
2021-08-24 17:41:05 -04:00
|
|
|
if pInfo.Size != partInfo.ActualSize {
|
|
|
|
return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.ActualSize)
|
2021-06-30 10:44:24 -04:00
|
|
|
}
|
|
|
|
uploadedParts = append(uploadedParts, miniogo.CompletePart{
|
|
|
|
PartNumber: pInfo.PartNumber,
|
|
|
|
ETag: pInfo.ETag,
|
|
|
|
})
|
|
|
|
}
|
2021-07-29 01:11:55 -04:00
|
|
|
_, err = c.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, miniogo.PutObjectOptions{
|
|
|
|
Internal: miniogo.AdvancedPutOptions{
|
|
|
|
SourceMTime: objInfo.ModTime,
|
|
|
|
// always set this to distinguish between `mc mirror` replication and serverside
|
|
|
|
ReplicationRequest: true,
|
|
|
|
}})
|
|
|
|
return err
|
2021-06-30 10:44:24 -04:00
|
|
|
}
|
|
|
|
|
2020-08-12 20:32:24 -04:00
|
|
|
// filterReplicationStatusMetadata filters replication status metadata for COPY
|
|
|
|
func filterReplicationStatusMetadata(metadata map[string]string) map[string]string {
|
|
|
|
// Copy on write
|
|
|
|
dst := metadata
|
|
|
|
var copied bool
|
|
|
|
delKey := func(key string) {
|
|
|
|
if _, ok := metadata[key]; !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if !copied {
|
|
|
|
dst = make(map[string]string, len(metadata))
|
|
|
|
for k, v := range metadata {
|
|
|
|
dst[k] = v
|
|
|
|
}
|
|
|
|
copied = true
|
|
|
|
}
|
|
|
|
delete(dst, key)
|
|
|
|
}
|
|
|
|
|
|
|
|
delKey(xhttp.AmzBucketReplicationStatus)
|
|
|
|
return dst
|
|
|
|
}
|
2020-09-16 19:04:55 -04:00
|
|
|
|
2021-06-01 22:59:11 -04:00
|
|
|
// DeletedObjectReplicationInfo has info on deleted object
|
|
|
|
type DeletedObjectReplicationInfo struct {
|
2020-11-19 21:43:58 -05:00
|
|
|
DeletedObject
|
2021-09-18 16:31:35 -04:00
|
|
|
Bucket string
|
|
|
|
OpType replication.Type
|
|
|
|
ResetID string
|
|
|
|
TargetArn string
|
2020-11-19 21:43:58 -05:00
|
|
|
}
|
|
|
|
|
2021-07-01 17:02:44 -04:00
|
|
|
// Replication specific APIName
|
|
|
|
const (
|
|
|
|
ReplicateObjectAPI = "ReplicateObject"
|
|
|
|
ReplicateDeleteAPI = "ReplicateDelete"
|
|
|
|
)
|
|
|
|
|
2021-06-29 02:58:08 -04:00
|
|
|
const (
|
2021-07-01 17:02:44 -04:00
|
|
|
// ReplicateQueued - replication being queued trail
|
|
|
|
ReplicateQueued = "replicate:queue"
|
|
|
|
|
|
|
|
// ReplicateExisting - audit trail for existing objects replication
|
|
|
|
ReplicateExisting = "replicate:existing"
|
|
|
|
// ReplicateExistingDelete - audit trail for delete replication triggered for existing delete markers
|
|
|
|
ReplicateExistingDelete = "replicate:existing:delete"
|
|
|
|
|
|
|
|
// ReplicateMRF - audit trail for replication from Most Recent Failures (MRF) queue
|
|
|
|
ReplicateMRF = "replicate:mrf"
|
|
|
|
// ReplicateIncoming - audit trail indicating replication started [could be from incoming/existing/heal activity]
|
|
|
|
ReplicateIncoming = "replicate:incoming"
|
|
|
|
// ReplicateHeal - audit trail for healing of failed/pending replications
|
|
|
|
ReplicateHeal = "replicate:heal"
|
|
|
|
// ReplicateDelete - audit trail for delete replication
|
|
|
|
ReplicateDelete = "replicate:delete"
|
2021-06-29 02:58:08 -04:00
|
|
|
)
|
|
|
|
|
2020-09-21 16:43:29 -04:00
|
|
|
var (
|
2021-04-03 12:03:42 -04:00
|
|
|
globalReplicationPool *ReplicationPool
|
|
|
|
globalReplicationStats *ReplicationStats
|
2020-09-21 16:43:29 -04:00
|
|
|
)
|
2020-09-16 19:04:55 -04:00
|
|
|
|
2021-03-09 05:56:42 -05:00
|
|
|
// ReplicationPool describes replication pool
|
|
|
|
type ReplicationPool struct {
|
2021-06-01 22:59:11 -04:00
|
|
|
objLayer ObjectLayer
|
|
|
|
ctx context.Context
|
|
|
|
mrfWorkerKillCh chan struct{}
|
|
|
|
workerKillCh chan struct{}
|
|
|
|
replicaCh chan ReplicateObjectInfo
|
|
|
|
replicaDeleteCh chan DeletedObjectReplicationInfo
|
|
|
|
mrfReplicaCh chan ReplicateObjectInfo
|
|
|
|
existingReplicaCh chan ReplicateObjectInfo
|
|
|
|
existingReplicaDeleteCh chan DeletedObjectReplicationInfo
|
|
|
|
workerSize int
|
|
|
|
mrfWorkerSize int
|
|
|
|
workerWg sync.WaitGroup
|
|
|
|
mrfWorkerWg sync.WaitGroup
|
|
|
|
once sync.Once
|
|
|
|
mu sync.Mutex
|
2021-03-09 05:56:42 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewReplicationPool creates a pool of replication workers of specified size
|
2021-04-24 00:58:45 -04:00
|
|
|
func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool {
|
2021-03-09 05:56:42 -05:00
|
|
|
pool := &ReplicationPool{
|
2021-06-01 22:59:11 -04:00
|
|
|
replicaCh: make(chan ReplicateObjectInfo, 100000),
|
|
|
|
replicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000),
|
|
|
|
mrfReplicaCh: make(chan ReplicateObjectInfo, 100000),
|
|
|
|
workerKillCh: make(chan struct{}, opts.Workers),
|
|
|
|
mrfWorkerKillCh: make(chan struct{}, opts.FailedWorkers),
|
|
|
|
existingReplicaCh: make(chan ReplicateObjectInfo, 100000),
|
|
|
|
existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000),
|
|
|
|
ctx: ctx,
|
|
|
|
objLayer: o,
|
2021-04-03 12:03:42 -04:00
|
|
|
}
|
2021-05-28 16:28:37 -04:00
|
|
|
|
2021-04-24 00:58:45 -04:00
|
|
|
pool.ResizeWorkers(opts.Workers)
|
|
|
|
pool.ResizeFailedWorkers(opts.FailedWorkers)
|
2021-06-01 22:59:11 -04:00
|
|
|
go pool.AddExistingObjectReplicateWorker()
|
2021-03-09 05:56:42 -05:00
|
|
|
return pool
|
2020-09-16 19:04:55 -04:00
|
|
|
}
|
|
|
|
|
2021-04-03 12:03:42 -04:00
|
|
|
// AddMRFWorker adds a pending/failed replication worker to handle requests that could not be queued
|
|
|
|
// to the other workers
|
|
|
|
func (p *ReplicationPool) AddMRFWorker() {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-p.ctx.Done():
|
|
|
|
return
|
|
|
|
case oi, ok := <-p.mrfReplicaCh:
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
2021-07-01 17:02:44 -04:00
|
|
|
replicateObject(p.ctx, oi, p.objLayer, ReplicateMRF)
|
2021-05-28 16:28:37 -04:00
|
|
|
case <-p.mrfWorkerKillCh:
|
|
|
|
return
|
2021-04-03 12:03:42 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-09 05:56:42 -05:00
|
|
|
// AddWorker adds a replication worker to the pool
|
|
|
|
func (p *ReplicationPool) AddWorker() {
|
2021-04-24 00:58:45 -04:00
|
|
|
defer p.workerWg.Done()
|
2021-03-09 05:56:42 -05:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-p.ctx.Done():
|
|
|
|
return
|
|
|
|
case oi, ok := <-p.replicaCh:
|
|
|
|
if !ok {
|
2020-09-16 19:04:55 -04:00
|
|
|
return
|
|
|
|
}
|
2021-07-01 17:02:44 -04:00
|
|
|
replicateObject(p.ctx, oi, p.objLayer, ReplicateIncoming)
|
2021-03-09 05:56:42 -05:00
|
|
|
case doi, ok := <-p.replicaDeleteCh:
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
2021-07-01 17:02:44 -04:00
|
|
|
replicateDelete(p.ctx, doi, p.objLayer, ReplicateDelete)
|
2021-04-24 00:58:45 -04:00
|
|
|
case <-p.workerKillCh:
|
2021-03-09 05:56:42 -05:00
|
|
|
return
|
2020-09-16 19:04:55 -04:00
|
|
|
}
|
2021-03-09 05:56:42 -05:00
|
|
|
}
|
|
|
|
|
2020-09-16 19:04:55 -04:00
|
|
|
}
|
2020-09-21 16:43:29 -04:00
|
|
|
|
2021-06-01 22:59:11 -04:00
|
|
|
// AddExistingObjectReplicateWorker adds a worker to queue existing objects that need to be sync'd
|
|
|
|
func (p *ReplicationPool) AddExistingObjectReplicateWorker() {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-p.ctx.Done():
|
|
|
|
return
|
|
|
|
case oi, ok := <-p.existingReplicaCh:
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
2021-07-01 17:02:44 -04:00
|
|
|
replicateObject(p.ctx, oi, p.objLayer, ReplicateExisting)
|
2021-06-01 22:59:11 -04:00
|
|
|
case doi, ok := <-p.existingReplicaDeleteCh:
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
2021-07-01 17:02:44 -04:00
|
|
|
replicateDelete(p.ctx, doi, p.objLayer, ReplicateExistingDelete)
|
2021-06-01 22:59:11 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-24 00:58:45 -04:00
|
|
|
// ResizeWorkers sets replication workers pool to new size
|
|
|
|
func (p *ReplicationPool) ResizeWorkers(n int) {
|
2021-03-09 05:56:42 -05:00
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
|
|
|
|
2021-04-24 00:58:45 -04:00
|
|
|
for p.workerSize < n {
|
|
|
|
p.workerSize++
|
|
|
|
p.workerWg.Add(1)
|
2021-03-09 05:56:42 -05:00
|
|
|
go p.AddWorker()
|
|
|
|
}
|
2021-04-24 00:58:45 -04:00
|
|
|
for p.workerSize > n {
|
|
|
|
p.workerSize--
|
|
|
|
go func() { p.workerKillCh <- struct{}{} }()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// ResizeFailedWorkers sets replication failed workers pool size
|
|
|
|
func (p *ReplicationPool) ResizeFailedWorkers(n int) {
|
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
|
|
|
|
|
|
|
for p.mrfWorkerSize < n {
|
|
|
|
p.mrfWorkerSize++
|
|
|
|
p.mrfWorkerWg.Add(1)
|
|
|
|
go p.AddMRFWorker()
|
|
|
|
}
|
|
|
|
for p.mrfWorkerSize > n {
|
|
|
|
p.mrfWorkerSize--
|
|
|
|
go func() { p.mrfWorkerKillCh <- struct{}{} }()
|
2021-03-09 05:56:42 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-29 21:20:39 -04:00
|
|
|
func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) {
|
2021-03-09 05:56:42 -05:00
|
|
|
if p == nil {
|
2020-09-21 16:43:29 -04:00
|
|
|
return
|
|
|
|
}
|
2021-03-09 05:56:42 -05:00
|
|
|
select {
|
2021-04-29 21:20:39 -04:00
|
|
|
case <-GlobalContext.Done():
|
2021-04-16 17:09:25 -04:00
|
|
|
p.once.Do(func() {
|
|
|
|
close(p.replicaCh)
|
|
|
|
close(p.mrfReplicaCh)
|
2021-06-01 22:59:11 -04:00
|
|
|
close(p.existingReplicaCh)
|
2021-04-16 17:09:25 -04:00
|
|
|
})
|
2021-04-15 19:32:00 -04:00
|
|
|
case p.mrfReplicaCh <- ri:
|
2021-03-09 05:56:42 -05:00
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
2020-09-21 16:43:29 -04:00
|
|
|
|
2021-04-29 21:20:39 -04:00
|
|
|
func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
|
|
|
|
if p == nil {
|
|
|
|
return
|
|
|
|
}
|
2021-06-01 22:59:11 -04:00
|
|
|
var ch chan ReplicateObjectInfo
|
|
|
|
switch ri.OpType {
|
|
|
|
case replication.ExistingObjectReplicationType:
|
|
|
|
ch = p.existingReplicaCh
|
2021-06-29 02:58:08 -04:00
|
|
|
case replication.HealReplicationType:
|
2021-07-01 17:02:44 -04:00
|
|
|
fallthrough
|
2021-06-01 22:59:11 -04:00
|
|
|
default:
|
|
|
|
ch = p.replicaCh
|
|
|
|
}
|
2021-04-29 21:20:39 -04:00
|
|
|
select {
|
|
|
|
case <-GlobalContext.Done():
|
|
|
|
p.once.Do(func() {
|
|
|
|
close(p.replicaCh)
|
|
|
|
close(p.mrfReplicaCh)
|
2021-06-01 22:59:11 -04:00
|
|
|
close(p.existingReplicaCh)
|
2021-04-29 21:20:39 -04:00
|
|
|
})
|
2021-06-01 22:59:11 -04:00
|
|
|
case ch <- ri:
|
2021-04-29 21:20:39 -04:00
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-18 16:31:35 -04:00
|
|
|
func queueReplicateDeletesWrapper(doi DeletedObjectReplicationInfo, existingObjectResync ResyncDecision) {
|
|
|
|
for k, v := range existingObjectResync.targets {
|
|
|
|
if v.Replicate {
|
|
|
|
doi.ResetID = v.ResetID
|
|
|
|
doi.TargetArn = k
|
|
|
|
|
|
|
|
globalReplicationPool.queueReplicaDeleteTask(doi)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-01 22:59:11 -04:00
|
|
|
func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInfo) {
|
2021-03-09 05:56:42 -05:00
|
|
|
if p == nil {
|
|
|
|
return
|
2020-09-21 16:43:29 -04:00
|
|
|
}
|
2021-06-01 22:59:11 -04:00
|
|
|
var ch chan DeletedObjectReplicationInfo
|
|
|
|
switch doi.OpType {
|
|
|
|
case replication.ExistingObjectReplicationType:
|
|
|
|
ch = p.existingReplicaDeleteCh
|
2021-06-29 02:58:08 -04:00
|
|
|
case replication.HealReplicationType:
|
2021-07-01 17:02:44 -04:00
|
|
|
fallthrough
|
2021-06-01 22:59:11 -04:00
|
|
|
default:
|
|
|
|
ch = p.replicaDeleteCh
|
|
|
|
}
|
|
|
|
|
2021-03-09 05:56:42 -05:00
|
|
|
select {
|
2021-04-29 21:20:39 -04:00
|
|
|
case <-GlobalContext.Done():
|
2021-04-16 17:09:25 -04:00
|
|
|
p.once.Do(func() {
|
|
|
|
close(p.replicaDeleteCh)
|
2021-06-01 22:59:11 -04:00
|
|
|
close(p.existingReplicaDeleteCh)
|
2021-04-16 17:09:25 -04:00
|
|
|
})
|
2021-06-01 22:59:11 -04:00
|
|
|
case ch <- doi:
|
2021-03-09 05:56:42 -05:00
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-24 00:58:45 -04:00
|
|
|
type replicationPoolOpts struct {
|
|
|
|
Workers int
|
|
|
|
FailedWorkers int
|
|
|
|
}
|
|
|
|
|
2021-03-09 05:56:42 -05:00
|
|
|
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
|
2021-04-24 00:58:45 -04:00
|
|
|
globalReplicationPool = NewReplicationPool(ctx, objectAPI, replicationPoolOpts{
|
|
|
|
Workers: globalAPIConfig.getReplicationWorkers(),
|
|
|
|
FailedWorkers: globalAPIConfig.getReplicationFailedWorkers(),
|
|
|
|
})
|
2021-04-03 12:03:42 -04:00
|
|
|
globalReplicationStats = NewReplicationStats(ctx, objectAPI)
|
2020-09-21 16:43:29 -04:00
|
|
|
}
|
2021-01-12 01:36:51 -05:00
|
|
|
|
|
|
|
// get Reader from replication target if active-active replication is in place and
|
|
|
|
// this node returns a 404
|
2021-09-18 16:31:35 -04:00
|
|
|
func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (gr *GetObjectReader, proxy bool) {
|
|
|
|
tgt, oi, proxy := proxyHeadToRepTarget(ctx, bucket, object, opts, proxyTargets)
|
|
|
|
if !proxy {
|
2021-01-12 01:36:51 -05:00
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
fn, off, length, err := NewGetObjectReader(rs, oi, opts)
|
|
|
|
if err != nil {
|
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
gopts := miniogo.GetObjectOptions{
|
|
|
|
VersionID: opts.VersionID,
|
|
|
|
ServerSideEncryption: opts.ServerSideEncryption,
|
|
|
|
Internal: miniogo.AdvancedGetOptions{
|
2021-01-27 14:22:34 -05:00
|
|
|
ReplicationProxyRequest: "true",
|
2021-01-12 01:36:51 -05:00
|
|
|
},
|
|
|
|
}
|
|
|
|
// get correct offsets for encrypted object
|
|
|
|
if off >= 0 && length >= 0 {
|
|
|
|
if err := gopts.SetRange(off, off+length-1); err != nil {
|
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
}
|
2021-02-03 23:41:33 -05:00
|
|
|
// Make sure to match ETag when proxying.
|
|
|
|
if err = gopts.SetMatchETag(oi.ETag); err != nil {
|
|
|
|
return nil, false
|
|
|
|
}
|
2021-01-12 01:36:51 -05:00
|
|
|
c := miniogo.Core{Client: tgt.Client}
|
|
|
|
obj, _, _, err := c.GetObject(ctx, bucket, object, gopts)
|
|
|
|
if err != nil {
|
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
closeReader := func() { obj.Close() }
|
|
|
|
|
2021-06-24 12:44:00 -04:00
|
|
|
reader, err := fn(obj, h, closeReader)
|
2021-01-12 01:36:51 -05:00
|
|
|
if err != nil {
|
|
|
|
return nil, false
|
|
|
|
}
|
2021-02-10 20:25:04 -05:00
|
|
|
reader.ObjInfo = oi.Clone()
|
2021-01-12 01:36:51 -05:00
|
|
|
return reader, true
|
|
|
|
}
|
|
|
|
|
2021-09-18 16:31:35 -04:00
|
|
|
func getproxyTargets(ctx context.Context, bucket, object string, opts ObjectOptions) (tgts *madmin.BucketTargets) {
|
2021-01-12 01:36:51 -05:00
|
|
|
cfg, err := getReplicationConfig(ctx, bucket)
|
2021-09-18 16:31:35 -04:00
|
|
|
if err != nil || cfg == nil {
|
|
|
|
return &madmin.BucketTargets{}
|
|
|
|
}
|
|
|
|
topts := replication.ObjectOpts{Name: object}
|
|
|
|
tgtArns := cfg.FilterTargetArns(topts)
|
|
|
|
tgts = &madmin.BucketTargets{Targets: make([]madmin.BucketTarget, len(tgtArns))}
|
|
|
|
for i, tgtArn := range tgtArns {
|
|
|
|
tgt := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, bucket, tgtArn)
|
|
|
|
tgts.Targets[i] = tgt
|
2021-01-12 01:36:51 -05:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
|
|
|
|
return tgts
|
2021-01-12 01:36:51 -05:00
|
|
|
}
|
2021-01-27 14:22:34 -05:00
|
|
|
|
2021-09-18 16:31:35 -04:00
|
|
|
func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (tgt *TargetClient, oi ObjectInfo, proxy bool) {
|
2021-01-12 01:36:51 -05:00
|
|
|
// this option is set when active-active replication is in place between site A -> B,
|
|
|
|
// and site B does not have the object yet.
|
2021-01-27 14:22:34 -05:00
|
|
|
if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header
|
2021-09-18 16:31:35 -04:00
|
|
|
return nil, oi, false
|
2021-01-12 01:36:51 -05:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
for _, t := range proxyTargets.Targets {
|
|
|
|
tgt = globalBucketTargetSys.GetRemoteTargetClient(ctx, t.Arn)
|
|
|
|
if tgt == nil || tgt.IsOffline() {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
// if proxying explicitly disabled on remote target
|
|
|
|
if tgt.disableProxy {
|
|
|
|
continue
|
|
|
|
}
|
2021-01-12 01:36:51 -05:00
|
|
|
|
2021-09-18 16:31:35 -04:00
|
|
|
gopts := miniogo.GetObjectOptions{
|
|
|
|
VersionID: opts.VersionID,
|
|
|
|
ServerSideEncryption: opts.ServerSideEncryption,
|
|
|
|
Internal: miniogo.AdvancedGetOptions{
|
|
|
|
ReplicationProxyRequest: "true",
|
|
|
|
},
|
|
|
|
}
|
|
|
|
objInfo, err := tgt.StatObject(ctx, t.TargetBucket, object, gopts)
|
|
|
|
if err != nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
tags, _ := tags.MapToObjectTags(objInfo.UserTags)
|
|
|
|
oi = ObjectInfo{
|
|
|
|
Bucket: bucket,
|
|
|
|
Name: object,
|
|
|
|
ModTime: objInfo.LastModified,
|
|
|
|
Size: objInfo.Size,
|
|
|
|
ETag: objInfo.ETag,
|
|
|
|
VersionID: objInfo.VersionID,
|
|
|
|
IsLatest: objInfo.IsLatest,
|
|
|
|
DeleteMarker: objInfo.IsDeleteMarker,
|
|
|
|
ContentType: objInfo.ContentType,
|
|
|
|
Expires: objInfo.Expires,
|
|
|
|
StorageClass: objInfo.StorageClass,
|
|
|
|
ReplicationStatusInternal: objInfo.ReplicationStatus,
|
|
|
|
UserTags: tags.String(),
|
|
|
|
}
|
|
|
|
oi.UserDefined = make(map[string]string, len(objInfo.Metadata))
|
|
|
|
for k, v := range objInfo.Metadata {
|
|
|
|
oi.UserDefined[k] = v[0]
|
|
|
|
}
|
|
|
|
ce, ok := oi.UserDefined[xhttp.ContentEncoding]
|
|
|
|
if !ok {
|
|
|
|
ce, ok = oi.UserDefined[strings.ToLower(xhttp.ContentEncoding)]
|
|
|
|
}
|
|
|
|
if ok {
|
|
|
|
oi.ContentEncoding = ce
|
|
|
|
}
|
|
|
|
return tgt, oi, true
|
2021-01-12 01:36:51 -05:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
return nil, oi, false
|
2021-01-12 01:36:51 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
// get object info from replication target if active-active replication is in place and
|
|
|
|
// this node returns a 404
|
2021-09-18 16:31:35 -04:00
|
|
|
func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (oi ObjectInfo, proxy bool) {
|
|
|
|
_, oi, proxy = proxyHeadToRepTarget(ctx, bucket, object, opts, proxyTargets)
|
|
|
|
return oi, proxy
|
2021-01-12 01:36:51 -05:00
|
|
|
}
|
|
|
|
|
2021-09-18 16:31:35 -04:00
|
|
|
func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, dsc ReplicateDecision, opType replication.Type) {
|
|
|
|
if dsc.Synchronous() {
|
|
|
|
replicateObject(ctx, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType, Dsc: dsc}, o, ReplicateIncoming)
|
2021-01-12 01:36:51 -05:00
|
|
|
} else {
|
2021-09-18 16:31:35 -04:00
|
|
|
globalReplicationPool.queueReplicaTask(ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType, Dsc: dsc})
|
2021-04-03 12:03:42 -04:00
|
|
|
}
|
|
|
|
if sz, err := objInfo.GetActualSize(); err == nil {
|
2021-09-18 16:31:35 -04:00
|
|
|
for arn := range dsc.targetsMap {
|
|
|
|
globalReplicationStats.Update(objInfo.Bucket, arn, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType)
|
|
|
|
}
|
2021-01-12 01:36:51 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-18 16:31:35 -04:00
|
|
|
func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer) {
|
2021-04-29 21:20:39 -04:00
|
|
|
globalReplicationPool.queueReplicaDeleteTask(dv)
|
2021-09-18 16:31:35 -04:00
|
|
|
for arn := range dv.ReplicationState.Targets {
|
|
|
|
globalReplicationStats.Update(dv.Bucket, arn, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
|
|
|
|
}
|
|
|
|
for arn := range dv.ReplicationState.PurgeTargets {
|
|
|
|
globalReplicationStats.Update(dv.Bucket, arn, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
|
|
|
|
}
|
2021-01-12 01:36:51 -05:00
|
|
|
}
|
2021-06-01 22:59:11 -04:00
|
|
|
|
|
|
|
type replicationConfig struct {
|
2021-09-18 16:31:35 -04:00
|
|
|
Config *replication.Config
|
|
|
|
remotes *madmin.BucketTargets
|
2021-06-01 22:59:11 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c replicationConfig) Empty() bool {
|
|
|
|
return c.Config == nil
|
|
|
|
}
|
|
|
|
func (c replicationConfig) Replicate(opts replication.ObjectOpts) bool {
|
|
|
|
return c.Config.Replicate(opts)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Resync returns true if replication reset is requested
|
2021-09-18 16:31:35 -04:00
|
|
|
func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc *ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) {
|
2021-06-01 22:59:11 -04:00
|
|
|
if c.Empty() {
|
2021-09-18 16:31:35 -04:00
|
|
|
return
|
2021-06-01 22:59:11 -04:00
|
|
|
}
|
|
|
|
// existing object replication does not apply to un-versioned objects
|
|
|
|
if oi.VersionID == "" || oi.VersionID == nullVersionID {
|
2021-09-18 16:31:35 -04:00
|
|
|
return
|
2021-06-01 22:59:11 -04:00
|
|
|
}
|
|
|
|
|
2021-09-18 16:31:35 -04:00
|
|
|
// Now overlay existing object replication choices for target
|
2021-06-01 22:59:11 -04:00
|
|
|
if oi.DeleteMarker {
|
2021-09-18 16:31:35 -04:00
|
|
|
opts := replication.ObjectOpts{
|
2021-06-01 22:59:11 -04:00
|
|
|
Name: oi.Name,
|
|
|
|
SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined),
|
|
|
|
UserTags: oi.UserTags,
|
|
|
|
DeleteMarker: oi.DeleteMarker,
|
|
|
|
VersionID: oi.VersionID,
|
|
|
|
OpType: replication.DeleteReplicationType,
|
2021-09-18 16:31:35 -04:00
|
|
|
ExistingObject: true}
|
|
|
|
|
|
|
|
tgtArns := c.Config.FilterTargetArns(opts)
|
|
|
|
// indicates no matching target with Existing object replication enabled.
|
|
|
|
if len(tgtArns) == 0 {
|
|
|
|
return
|
2021-06-01 22:59:11 -04:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
for _, t := range tgtArns {
|
|
|
|
opts.TargetArn = t
|
|
|
|
// Update replication decision for target based on existing object replciation rule.
|
|
|
|
dsc.Set(newReplicateTargetDecision(t, c.Replicate(opts), false))
|
|
|
|
}
|
|
|
|
return c.resync(oi, dsc, tgtStatuses)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Ignore previous replication status when deciding if object can be re-replicated
|
|
|
|
objInfo := oi.Clone()
|
|
|
|
objInfo.ReplicationStatusInternal = ""
|
|
|
|
objInfo.VersionPurgeStatusInternal = ""
|
|
|
|
objInfo.ReplicationStatus = ""
|
|
|
|
objInfo.VersionPurgeStatus = ""
|
|
|
|
resyncdsc := mustReplicate(ctx, oi.Bucket, oi.Name, getMustReplicateOptions(objInfo, replication.ExistingObjectReplicationType, ObjectOptions{}))
|
|
|
|
dsc = &resyncdsc
|
|
|
|
return c.resync(oi, dsc, tgtStatuses)
|
2021-06-01 22:59:11 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
// wrapper function for testability. Returns true if a new reset is requested on
|
|
|
|
// already replicated objects OR object qualifies for existing object replication
|
|
|
|
// and no reset requested.
|
2021-09-18 16:31:35 -04:00
|
|
|
func (c replicationConfig) resync(oi ObjectInfo, dsc *ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) {
|
|
|
|
r = ResyncDecision{
|
|
|
|
targets: make(map[string]ResyncTargetDecision),
|
|
|
|
}
|
|
|
|
if c.remotes == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
for _, tgt := range c.remotes.Targets {
|
|
|
|
d, ok := dsc.targetsMap[tgt.Arn]
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if !d.Replicate {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
r.targets[d.Arn] = resyncTarget(oi, tgt.Arn, tgt.ResetID, tgt.ResetBeforeDate, tgtStatuses[tgt.Arn])
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func targetResetHeader(arn string) string {
|
|
|
|
return fmt.Sprintf("%s-%s", ReservedMetadataPrefixLower+ReplicationReset, arn)
|
|
|
|
}
|
|
|
|
|
|
|
|
func resyncTarget(oi ObjectInfo, arn string, resetID string, resetBeforeDate time.Time, tgtStatus replication.StatusType) (rd ResyncTargetDecision) {
|
|
|
|
rd = ResyncTargetDecision{
|
|
|
|
ResetID: resetID,
|
|
|
|
ResetBeforeDate: resetBeforeDate,
|
|
|
|
}
|
|
|
|
rs, ok := oi.UserDefined[targetResetHeader(arn)]
|
|
|
|
if !ok {
|
|
|
|
rs, ok = oi.UserDefined[xhttp.MinIOReplicationResetStatus] //for backward compatibility
|
2021-06-01 22:59:11 -04:00
|
|
|
}
|
|
|
|
if !ok { // existing object replication is enabled and object version is unreplicated so far.
|
2021-09-18 16:31:35 -04:00
|
|
|
if resetID != "" && oi.ModTime.Before(resetBeforeDate) { // trigger replication if `mc replicate reset` requested
|
|
|
|
rd.Replicate = true
|
|
|
|
return
|
2021-06-01 22:59:11 -04:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
// For existing object reset - this condition is needed
|
|
|
|
rd.Replicate = tgtStatus == ""
|
|
|
|
return
|
2021-06-01 22:59:11 -04:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
if resetID == "" || resetBeforeDate.Equal(timeSentinel) { // no reset in progress
|
|
|
|
return
|
2021-06-01 22:59:11 -04:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
|
2021-06-01 22:59:11 -04:00
|
|
|
// if already replicated, return true if a new reset was requested.
|
|
|
|
splits := strings.SplitN(rs, ";", 2)
|
2021-09-18 16:31:35 -04:00
|
|
|
if len(splits) != 2 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
newReset := splits[1] != resetID
|
|
|
|
if !newReset && tgtStatus == replication.Completed {
|
2021-06-01 22:59:11 -04:00
|
|
|
// already replicated and no reset requested
|
2021-09-18 16:31:35 -04:00
|
|
|
return
|
2021-06-01 22:59:11 -04:00
|
|
|
}
|
2021-09-18 16:31:35 -04:00
|
|
|
rd.Replicate = newReset && oi.ModTime.Before(resetBeforeDate)
|
|
|
|
return
|
2021-06-01 22:59:11 -04:00
|
|
|
}
|