mirror of
https://github.com/minio/minio.git
synced 2024-12-26 07:05:55 -05:00
0892f1e406
Replication was not working properly for encrypted objects in single PUT object for preserving etag, We need to make sure to preserve etag such that replication works properly and not gets into infinite loops of copying due to ETag mismatches.
1488 lines
46 KiB
Go
1488 lines
46 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/minio/madmin-go"
|
|
"github.com/minio/minio-go/v7"
|
|
miniogo "github.com/minio/minio-go/v7"
|
|
"github.com/minio/minio-go/v7/pkg/encrypt"
|
|
"github.com/minio/minio-go/v7/pkg/tags"
|
|
"github.com/minio/minio/internal/bucket/bandwidth"
|
|
"github.com/minio/minio/internal/bucket/replication"
|
|
"github.com/minio/minio/internal/config/storageclass"
|
|
"github.com/minio/minio/internal/crypto"
|
|
"github.com/minio/minio/internal/event"
|
|
"github.com/minio/minio/internal/hash"
|
|
xhttp "github.com/minio/minio/internal/http"
|
|
"github.com/minio/minio/internal/logger"
|
|
)
|
|
|
|
const throttleDeadline = 1 * time.Hour
|
|
|
|
// gets replication config associated to a given bucket name.
|
|
func getReplicationConfig(ctx context.Context, bucketName string) (rc *replication.Config, err error) {
|
|
if globalIsGateway {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return nil, errServerNotInitialized
|
|
}
|
|
|
|
return nil, BucketReplicationConfigNotFound{Bucket: bucketName}
|
|
}
|
|
|
|
return globalBucketMetadataSys.GetReplicationConfig(ctx, bucketName)
|
|
}
|
|
|
|
// validateReplicationDestination returns error if replication destination bucket missing or not configured
|
|
// It also returns true if replication destination is same as this server.
|
|
func validateReplicationDestination(ctx context.Context, bucket string, rCfg *replication.Config) (bool, error) {
|
|
arn, err := madmin.ParseARN(rCfg.RoleArn)
|
|
if err != nil {
|
|
return false, BucketRemoteArnInvalid{}
|
|
}
|
|
if arn.Type != madmin.ReplicationService {
|
|
return false, BucketRemoteArnTypeInvalid{}
|
|
}
|
|
clnt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rCfg.RoleArn)
|
|
if clnt == nil {
|
|
return false, BucketRemoteTargetNotFound{Bucket: bucket}
|
|
}
|
|
if found, _ := clnt.BucketExists(ctx, rCfg.GetDestination().Bucket); !found {
|
|
return false, BucketRemoteDestinationNotFound{Bucket: rCfg.GetDestination().Bucket}
|
|
}
|
|
if ret, err := globalBucketObjectLockSys.Get(bucket); err == nil {
|
|
if ret.LockEnabled {
|
|
lock, _, _, _, err := clnt.GetObjectLockConfig(ctx, rCfg.GetDestination().Bucket)
|
|
if err != nil || lock != "Enabled" {
|
|
return false, BucketReplicationDestinationMissingLock{Bucket: rCfg.GetDestination().Bucket}
|
|
}
|
|
}
|
|
}
|
|
// validate replication ARN against target endpoint
|
|
c, ok := globalBucketTargetSys.arnRemotesMap[rCfg.RoleArn]
|
|
if ok {
|
|
if c.EndpointURL().String() == clnt.EndpointURL().String() {
|
|
sameTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort)
|
|
return sameTarget, nil
|
|
}
|
|
}
|
|
return false, BucketRemoteTargetNotFound{Bucket: bucket}
|
|
}
|
|
|
|
type mustReplicateOptions struct {
|
|
meta map[string]string
|
|
status replication.StatusType
|
|
opType replication.Type
|
|
}
|
|
|
|
func (o mustReplicateOptions) ReplicationStatus() (s replication.StatusType) {
|
|
if rs, ok := o.meta[xhttp.AmzBucketReplicationStatus]; ok {
|
|
return replication.StatusType(rs)
|
|
}
|
|
return s
|
|
}
|
|
func (o mustReplicateOptions) isExistingObjectReplication() bool {
|
|
return o.opType == replication.ExistingObjectReplicationType
|
|
}
|
|
|
|
func (o mustReplicateOptions) isMetadataReplication() bool {
|
|
return o.opType == replication.MetadataReplicationType
|
|
}
|
|
func getMustReplicateOptions(o ObjectInfo, op replication.Type) mustReplicateOptions {
|
|
if !op.Valid() {
|
|
op = replication.ObjectReplicationType
|
|
if o.metadataOnly {
|
|
op = replication.MetadataReplicationType
|
|
}
|
|
}
|
|
meta := cloneMSS(o.UserDefined)
|
|
if o.UserTags != "" {
|
|
meta[xhttp.AmzObjectTagging] = o.UserTags
|
|
}
|
|
return mustReplicateOptions{
|
|
meta: meta,
|
|
status: o.ReplicationStatus,
|
|
opType: op,
|
|
}
|
|
}
|
|
|
|
// mustReplicate returns 2 booleans - true if object meets replication criteria and true if replication is to be done in
|
|
// a synchronous manner.
|
|
func mustReplicate(ctx context.Context, bucket, object string, mopts mustReplicateOptions) (replicate bool, sync bool) {
|
|
if globalIsGateway {
|
|
return replicate, sync
|
|
}
|
|
replStatus := mopts.ReplicationStatus()
|
|
if replStatus == replication.Replica && !mopts.isMetadataReplication() {
|
|
return replicate, sync
|
|
}
|
|
cfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil {
|
|
return replicate, sync
|
|
}
|
|
opts := replication.ObjectOpts{
|
|
Name: object,
|
|
SSEC: crypto.SSEC.IsEncrypted(mopts.meta),
|
|
Replica: replStatus == replication.Replica,
|
|
ExistingObject: mopts.isExistingObjectReplication(),
|
|
}
|
|
tagStr, ok := mopts.meta[xhttp.AmzObjectTagging]
|
|
if ok {
|
|
opts.UserTags = tagStr
|
|
}
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
|
|
// the target online status should not be used here while deciding
|
|
// whether to replicate as the target could be temporarily down
|
|
if tgt != nil {
|
|
return cfg.Replicate(opts), tgt.replicateSync
|
|
}
|
|
return cfg.Replicate(opts), false
|
|
}
|
|
|
|
// Standard headers that needs to be extracted from User metadata.
|
|
var standardHeaders = []string{
|
|
xhttp.ContentType,
|
|
xhttp.CacheControl,
|
|
xhttp.ContentEncoding,
|
|
xhttp.ContentLanguage,
|
|
xhttp.ContentDisposition,
|
|
xhttp.AmzStorageClass,
|
|
xhttp.AmzObjectTagging,
|
|
xhttp.AmzBucketReplicationStatus,
|
|
xhttp.AmzObjectLockMode,
|
|
xhttp.AmzObjectLockRetainUntilDate,
|
|
xhttp.AmzObjectLockLegalHold,
|
|
xhttp.AmzTagCount,
|
|
xhttp.AmzServerSideEncryption,
|
|
}
|
|
|
|
// returns true if any of the objects being deleted qualifies for replication.
|
|
func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToDelete) bool {
|
|
c, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil || c == nil {
|
|
return false
|
|
}
|
|
for _, obj := range objects {
|
|
if c.HasActiveRules(obj.ObjectName, true) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// isStandardHeader returns true if header is a supported header and not a custom header
|
|
func isStandardHeader(matchHeaderKey string) bool {
|
|
return equals(matchHeaderKey, standardHeaders...)
|
|
}
|
|
|
|
// returns whether object version is a deletemarker and if object qualifies for replication
|
|
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (replicate, sync bool) {
|
|
rcfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil || rcfg == nil {
|
|
return false, sync
|
|
}
|
|
opts := replication.ObjectOpts{
|
|
Name: dobj.ObjectName,
|
|
SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined),
|
|
UserTags: oi.UserTags,
|
|
DeleteMarker: oi.DeleteMarker,
|
|
VersionID: dobj.VersionID,
|
|
OpType: replication.DeleteReplicationType,
|
|
}
|
|
replicate = rcfg.Replicate(opts)
|
|
// when incoming delete is removal of a delete marker( a.k.a versioned delete),
|
|
// GetObjectInfo returns extra information even though it returns errFileNotFound
|
|
if gerr != nil {
|
|
validReplStatus := false
|
|
switch oi.ReplicationStatus {
|
|
case replication.Pending, replication.Completed, replication.Failed:
|
|
validReplStatus = true
|
|
}
|
|
if oi.DeleteMarker && (validReplStatus || replicate) {
|
|
return true, sync
|
|
}
|
|
// can be the case that other cluster is down and duplicate `mc rm --vid`
|
|
// is issued - this still needs to be replicated back to the other target
|
|
return oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed, sync
|
|
}
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn)
|
|
// the target online status should not be used here while deciding
|
|
// whether to replicate deletes as the target could be temporarily down
|
|
if tgt == nil {
|
|
return false, false
|
|
}
|
|
return replicate, tgt.replicateSync
|
|
}
|
|
|
|
// replicate deletes to the designated replication target if replication configuration
|
|
// has delete marker replication or delete replication (MinIO extension to allow deletes where version id
|
|
// is specified) enabled.
|
|
// Similar to bucket replication for PUT operation, soft delete (a.k.a setting delete marker) and
|
|
// permanent deletes (by specifying a version ID in the delete operation) have three states "Pending", "Complete"
|
|
// and "Failed" to mark the status of the replication of "DELETE" operation. All failed operations can
|
|
// then be retried by healing. In the case of permanent deletes, until the replication is completed on the
|
|
// target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently
|
|
// deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds
|
|
// on target.
|
|
func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer, trigger string) {
|
|
var versionPurgeStatus VersionPurgeStatusType
|
|
var replicationStatus string
|
|
|
|
bucket := dobj.Bucket
|
|
versionID := dobj.DeleteMarkerVersionID
|
|
if versionID == "" {
|
|
versionID = dobj.VersionID
|
|
}
|
|
|
|
defer func() {
|
|
replStatus := replicationStatus
|
|
if !versionPurgeStatus.Empty() {
|
|
replStatus = string(versionPurgeStatus)
|
|
}
|
|
auditLogInternal(context.Background(), bucket, dobj.ObjectName, AuditLogOptions{
|
|
Trigger: trigger,
|
|
APIName: ReplicateDeleteAPI,
|
|
VersionID: versionID,
|
|
Status: replStatus,
|
|
})
|
|
}()
|
|
|
|
rcfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil || rcfg == nil {
|
|
logger.LogIf(ctx, err)
|
|
sendEvent(eventArgs{
|
|
BucketName: bucket,
|
|
Object: ObjectInfo{
|
|
Bucket: bucket,
|
|
Name: dobj.ObjectName,
|
|
VersionID: versionID,
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
},
|
|
Host: "Internal: [Replication]",
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
})
|
|
return
|
|
}
|
|
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn)
|
|
if tgt == nil {
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, rcfg.RoleArn))
|
|
sendEvent(eventArgs{
|
|
BucketName: bucket,
|
|
Object: ObjectInfo{
|
|
Bucket: bucket,
|
|
Name: dobj.ObjectName,
|
|
VersionID: versionID,
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
},
|
|
Host: "Internal: [Replication]",
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
})
|
|
return
|
|
}
|
|
|
|
if tgt.IsOffline() {
|
|
logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, rcfg.RoleArn))
|
|
sendEvent(eventArgs{
|
|
BucketName: bucket,
|
|
Object: ObjectInfo{
|
|
Bucket: bucket,
|
|
Name: dobj.ObjectName,
|
|
VersionID: versionID,
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
},
|
|
Host: "Internal: [Replication]",
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
})
|
|
return
|
|
}
|
|
// Lock the object name before starting replication operation.
|
|
// Use separate lock that doesn't collide with regular objects.
|
|
lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+dobj.ObjectName)
|
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", dobj.ObjectName, bucket, rcfg.RoleArn))
|
|
sendEvent(eventArgs{
|
|
BucketName: bucket,
|
|
Object: ObjectInfo{
|
|
Bucket: bucket,
|
|
Name: dobj.ObjectName,
|
|
VersionID: versionID,
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
},
|
|
Host: "Internal: [Replication]",
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
})
|
|
return
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer lk.Unlock(lkctx.Cancel)
|
|
// early return if already replicated delete marker for existing object replication
|
|
if dobj.DeleteMarkerVersionID != "" && dobj.OpType == replication.ExistingObjectReplicationType {
|
|
_, err := tgt.StatObject(ctx, rcfg.GetDestination().Bucket, dobj.ObjectName, miniogo.StatObjectOptions{
|
|
VersionID: versionID,
|
|
Internal: miniogo.AdvancedGetOptions{
|
|
ReplicationProxyRequest: "false",
|
|
}})
|
|
if isErrMethodNotAllowed(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)) {
|
|
return
|
|
}
|
|
}
|
|
|
|
rmErr := tgt.RemoveObject(ctx, rcfg.GetDestination().Bucket, dobj.ObjectName, miniogo.RemoveObjectOptions{
|
|
VersionID: versionID,
|
|
Internal: miniogo.AdvancedRemoveOptions{
|
|
ReplicationDeleteMarker: dobj.DeleteMarkerVersionID != "",
|
|
ReplicationMTime: dobj.DeleteMarkerMTime.Time,
|
|
ReplicationStatus: miniogo.ReplicationStatusReplica,
|
|
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
|
|
},
|
|
})
|
|
|
|
replicationStatus = dobj.DeleteMarkerReplicationStatus
|
|
versionPurgeStatus = dobj.VersionPurgeStatus
|
|
|
|
if rmErr != nil {
|
|
if dobj.VersionID == "" {
|
|
replicationStatus = string(replication.Failed)
|
|
} else {
|
|
versionPurgeStatus = Failed
|
|
}
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate delete marker to %s/%s(%s): %s", rcfg.GetDestination().Bucket, dobj.ObjectName, versionID, rmErr))
|
|
} else {
|
|
if dobj.VersionID == "" {
|
|
replicationStatus = string(replication.Completed)
|
|
} else {
|
|
versionPurgeStatus = Complete
|
|
}
|
|
}
|
|
prevStatus := dobj.DeleteMarkerReplicationStatus
|
|
currStatus := replicationStatus
|
|
if dobj.VersionID != "" {
|
|
prevStatus = string(dobj.VersionPurgeStatus)
|
|
currStatus = string(versionPurgeStatus)
|
|
}
|
|
// to decrement pending count later.
|
|
globalReplicationStats.Update(dobj.Bucket, 0, replication.StatusType(currStatus),
|
|
replication.StatusType(prevStatus), replication.DeleteReplicationType)
|
|
|
|
var eventName = event.ObjectReplicationComplete
|
|
if replicationStatus == string(replication.Failed) || versionPurgeStatus == Failed {
|
|
eventName = event.ObjectReplicationFailed
|
|
}
|
|
|
|
// Update metadata on the delete marker or purge permanent delete if replication success.
|
|
dobjInfo, err := objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{
|
|
VersionID: versionID,
|
|
DeleteMarkerReplicationStatus: replicationStatus,
|
|
VersionPurgeStatus: versionPurgeStatus,
|
|
Versioned: globalBucketVersioningSys.Enabled(bucket),
|
|
VersionSuspended: globalBucketVersioningSys.Suspended(bucket),
|
|
})
|
|
if err != nil && !isErrVersionNotFound(err) { // VersionNotFound would be reported by pool that object version is missing on.
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %s", bucket, dobj.ObjectName, versionID, err))
|
|
sendEvent(eventArgs{
|
|
BucketName: bucket,
|
|
Object: ObjectInfo{
|
|
Bucket: bucket,
|
|
Name: dobj.ObjectName,
|
|
VersionID: versionID,
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
},
|
|
Host: "Internal: [Replication]",
|
|
EventName: eventName,
|
|
})
|
|
} else {
|
|
sendEvent(eventArgs{
|
|
BucketName: bucket,
|
|
Object: dobjInfo,
|
|
Host: "Internal: [Replication]",
|
|
EventName: eventName,
|
|
})
|
|
}
|
|
}
|
|
|
|
func getCopyObjMetadata(oi ObjectInfo, dest replication.Destination) map[string]string {
|
|
meta := make(map[string]string, len(oi.UserDefined))
|
|
for k, v := range oi.UserDefined {
|
|
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
|
|
continue
|
|
}
|
|
|
|
if equals(k, xhttp.AmzBucketReplicationStatus) {
|
|
continue
|
|
}
|
|
|
|
// https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w
|
|
if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) {
|
|
continue
|
|
}
|
|
meta[k] = v
|
|
}
|
|
|
|
if oi.ContentEncoding != "" {
|
|
meta[xhttp.ContentEncoding] = oi.ContentEncoding
|
|
}
|
|
|
|
if oi.ContentType != "" {
|
|
meta[xhttp.ContentType] = oi.ContentType
|
|
}
|
|
|
|
if oi.UserTags != "" {
|
|
meta[xhttp.AmzObjectTagging] = oi.UserTags
|
|
meta[xhttp.AmzTagDirective] = "REPLACE"
|
|
}
|
|
|
|
sc := dest.StorageClass
|
|
if sc == "" {
|
|
sc = oi.StorageClass
|
|
}
|
|
// drop non standard storage classes for tiering from replication
|
|
if sc != "" && (sc == storageclass.RRS || sc == storageclass.STANDARD) {
|
|
meta[xhttp.AmzStorageClass] = sc
|
|
}
|
|
|
|
meta[xhttp.MinIOSourceETag] = oi.ETag
|
|
meta[xhttp.MinIOSourceMTime] = oi.ModTime.Format(time.RFC3339Nano)
|
|
meta[xhttp.AmzBucketReplicationStatus] = replication.Replica.String()
|
|
return meta
|
|
}
|
|
|
|
type caseInsensitiveMap map[string]string
|
|
|
|
// Lookup map entry case insensitively.
|
|
func (m caseInsensitiveMap) Lookup(key string) (string, bool) {
|
|
if len(m) == 0 {
|
|
return "", false
|
|
}
|
|
for _, k := range []string{
|
|
key,
|
|
strings.ToLower(key),
|
|
http.CanonicalHeaderKey(key),
|
|
} {
|
|
v, ok := m[k]
|
|
if ok {
|
|
return v, ok
|
|
}
|
|
}
|
|
return "", false
|
|
}
|
|
|
|
func putReplicationOpts(ctx context.Context, dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, err error) {
|
|
meta := make(map[string]string)
|
|
for k, v := range objInfo.UserDefined {
|
|
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
|
|
continue
|
|
}
|
|
if isStandardHeader(k) {
|
|
continue
|
|
}
|
|
meta[k] = v
|
|
}
|
|
|
|
sc := dest.StorageClass
|
|
if sc == "" && (objInfo.StorageClass == storageclass.STANDARD || objInfo.StorageClass == storageclass.RRS) {
|
|
sc = objInfo.StorageClass
|
|
}
|
|
putOpts = miniogo.PutObjectOptions{
|
|
UserMetadata: meta,
|
|
ContentType: objInfo.ContentType,
|
|
ContentEncoding: objInfo.ContentEncoding,
|
|
StorageClass: sc,
|
|
Internal: miniogo.AdvancedPutOptions{
|
|
SourceVersionID: objInfo.VersionID,
|
|
ReplicationStatus: miniogo.ReplicationStatusReplica,
|
|
SourceMTime: objInfo.ModTime,
|
|
SourceETag: objInfo.ETag,
|
|
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
|
|
},
|
|
}
|
|
if objInfo.UserTags != "" {
|
|
tag, _ := tags.ParseObjectTags(objInfo.UserTags)
|
|
if tag != nil {
|
|
putOpts.UserTags = tag.ToMap()
|
|
}
|
|
}
|
|
|
|
lkMap := caseInsensitiveMap(objInfo.UserDefined)
|
|
if lang, ok := lkMap.Lookup(xhttp.ContentLanguage); ok {
|
|
putOpts.ContentLanguage = lang
|
|
}
|
|
if disp, ok := lkMap.Lookup(xhttp.ContentDisposition); ok {
|
|
putOpts.ContentDisposition = disp
|
|
}
|
|
if cc, ok := lkMap.Lookup(xhttp.CacheControl); ok {
|
|
putOpts.CacheControl = cc
|
|
}
|
|
if mode, ok := lkMap.Lookup(xhttp.AmzObjectLockMode); ok {
|
|
rmode := miniogo.RetentionMode(mode)
|
|
putOpts.Mode = rmode
|
|
}
|
|
if retainDateStr, ok := lkMap.Lookup(xhttp.AmzObjectLockRetainUntilDate); ok {
|
|
rdate, err := time.Parse(time.RFC3339, retainDateStr)
|
|
if err != nil {
|
|
return putOpts, err
|
|
}
|
|
putOpts.RetainUntilDate = rdate
|
|
}
|
|
if lhold, ok := lkMap.Lookup(xhttp.AmzObjectLockLegalHold); ok {
|
|
putOpts.LegalHold = miniogo.LegalHoldStatus(lhold)
|
|
}
|
|
if crypto.S3.IsEncrypted(objInfo.UserDefined) {
|
|
putOpts.ServerSideEncryption = encrypt.NewSSE()
|
|
}
|
|
return
|
|
}
|
|
|
|
type replicationAction string
|
|
|
|
const (
|
|
replicateMetadata replicationAction = "metadata"
|
|
replicateNone replicationAction = "none"
|
|
replicateAll replicationAction = "all"
|
|
)
|
|
|
|
// matches k1 with all keys, returns 'true' if one of them matches
|
|
func equals(k1 string, keys ...string) bool {
|
|
for _, k2 := range keys {
|
|
if strings.ToLower(k1) == strings.ToLower(k2) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// returns replicationAction by comparing metadata between source and target
|
|
func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationAction {
|
|
// needs full replication
|
|
if oi1.ETag != oi2.ETag ||
|
|
oi1.VersionID != oi2.VersionID ||
|
|
oi1.Size != oi2.Size ||
|
|
oi1.DeleteMarker != oi2.IsDeleteMarker ||
|
|
oi1.ModTime.Unix() != oi2.LastModified.Unix() {
|
|
return replicateAll
|
|
}
|
|
|
|
if oi1.ContentType != oi2.ContentType {
|
|
return replicateMetadata
|
|
}
|
|
|
|
if oi1.ContentEncoding != "" {
|
|
enc, ok := oi2.Metadata[xhttp.ContentEncoding]
|
|
if !ok {
|
|
enc, ok = oi2.Metadata[strings.ToLower(xhttp.ContentEncoding)]
|
|
if !ok {
|
|
return replicateMetadata
|
|
}
|
|
}
|
|
if strings.Join(enc, ",") != oi1.ContentEncoding {
|
|
return replicateMetadata
|
|
}
|
|
}
|
|
|
|
t, _ := tags.ParseObjectTags(oi1.UserTags)
|
|
if !reflect.DeepEqual(oi2.UserTags, t.ToMap()) {
|
|
return replicateMetadata
|
|
}
|
|
|
|
// Compare only necessary headers
|
|
compareKeys := []string{
|
|
"Expires",
|
|
"Cache-Control",
|
|
"Content-Language",
|
|
"Content-Disposition",
|
|
"X-Amz-Object-Lock-Mode",
|
|
"X-Amz-Object-Lock-Retain-Until-Date",
|
|
"X-Amz-Object-Lock-Legal-Hold",
|
|
"X-Amz-Website-Redirect-Location",
|
|
"X-Amz-Meta-",
|
|
}
|
|
|
|
// compare metadata on both maps to see if meta is identical
|
|
compareMeta1 := make(map[string]string)
|
|
for k, v := range oi1.UserDefined {
|
|
var found bool
|
|
for _, prefix := range compareKeys {
|
|
if !strings.HasPrefix(strings.ToLower(k), strings.ToLower(prefix)) {
|
|
continue
|
|
}
|
|
found = true
|
|
break
|
|
}
|
|
if found {
|
|
compareMeta1[strings.ToLower(k)] = v
|
|
}
|
|
}
|
|
|
|
compareMeta2 := make(map[string]string)
|
|
for k, v := range oi2.Metadata {
|
|
var found bool
|
|
for _, prefix := range compareKeys {
|
|
if !strings.HasPrefix(strings.ToLower(k), strings.ToLower(prefix)) {
|
|
continue
|
|
}
|
|
found = true
|
|
break
|
|
}
|
|
if found {
|
|
compareMeta2[strings.ToLower(k)] = strings.Join(v, ",")
|
|
}
|
|
}
|
|
|
|
if !reflect.DeepEqual(compareMeta1, compareMeta2) {
|
|
return replicateMetadata
|
|
}
|
|
|
|
return replicateNone
|
|
}
|
|
|
|
// replicateObject replicates the specified version of the object to destination bucket
|
|
// The source object is then updated to reflect the replication status.
|
|
func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, trigger string) {
|
|
var replicationStatus replication.StatusType
|
|
defer func() {
|
|
if replicationStatus.Empty() {
|
|
// replication status is empty means
|
|
// replication was not attempted for some
|
|
// reason, notify the state of the object
|
|
// on disk.
|
|
replicationStatus = ri.ReplicationStatus
|
|
}
|
|
auditLogInternal(ctx, ri.Bucket, ri.Name, AuditLogOptions{
|
|
Trigger: trigger,
|
|
APIName: ReplicateObjectAPI,
|
|
VersionID: ri.VersionID,
|
|
Status: replicationStatus.String(),
|
|
})
|
|
}()
|
|
|
|
objInfo := ri.ObjectInfo
|
|
bucket := objInfo.Bucket
|
|
object := objInfo.Name
|
|
|
|
cfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
sendEvent(eventArgs{
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
return
|
|
}
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
|
|
if tgt == nil {
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, cfg.RoleArn))
|
|
sendEvent(eventArgs{
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
return
|
|
}
|
|
if tgt.IsOffline() {
|
|
logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, cfg.RoleArn))
|
|
sendEvent(eventArgs{
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
return
|
|
}
|
|
// Lock the object name before starting replication.
|
|
// Use separate lock that doesn't collide with regular objects.
|
|
lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+object)
|
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
sendEvent(eventArgs{
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", object, bucket, cfg.RoleArn))
|
|
return
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer lk.Unlock(lkctx.Cancel)
|
|
|
|
var closeOnDefer bool
|
|
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
|
|
VersionID: objInfo.VersionID,
|
|
})
|
|
if err != nil {
|
|
sendEvent(eventArgs{
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replicate for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err))
|
|
return
|
|
}
|
|
defer func() {
|
|
if closeOnDefer {
|
|
gr.Close()
|
|
}
|
|
}()
|
|
closeOnDefer = true
|
|
|
|
objInfo = gr.ObjInfo
|
|
size, err := objInfo.GetActualSize()
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
sendEvent(eventArgs{
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
return
|
|
}
|
|
|
|
dest := cfg.GetDestination()
|
|
if dest.Bucket == "" {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate object %s(%s), bucket is empty", objInfo.Name, objInfo.VersionID))
|
|
sendEvent(eventArgs{
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
return
|
|
}
|
|
|
|
rtype := replicateAll
|
|
oi, err := tgt.StatObject(ctx, dest.Bucket, object, miniogo.StatObjectOptions{
|
|
VersionID: objInfo.VersionID,
|
|
Internal: miniogo.AdvancedGetOptions{
|
|
ReplicationProxyRequest: "false",
|
|
}})
|
|
if err == nil {
|
|
rtype = getReplicationAction(objInfo, oi)
|
|
if rtype == replicateNone {
|
|
// object with same VersionID already exists, replication kicked off by
|
|
// PutObject might have completed
|
|
if objInfo.ReplicationStatus == replication.Pending || objInfo.ReplicationStatus == replication.Failed || ri.OpType == replication.ExistingObjectReplicationType {
|
|
// if metadata is not updated for some reason after replication, such as
|
|
// 503 encountered while updating metadata - make sure to set ReplicationStatus
|
|
// as Completed.
|
|
//
|
|
// Note: Replication Stats would have been updated despite metadata update failure.
|
|
gr.Close()
|
|
closeOnDefer = false
|
|
popts := ObjectOptions{
|
|
MTime: objInfo.ModTime,
|
|
VersionID: objInfo.VersionID,
|
|
UserDefined: make(map[string]string, len(objInfo.UserDefined)),
|
|
}
|
|
for k, v := range objInfo.UserDefined {
|
|
popts.UserDefined[k] = v
|
|
}
|
|
if ri.OpType == replication.ExistingObjectReplicationType {
|
|
popts.UserDefined[xhttp.MinIOReplicationResetStatus] = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), ri.ResetID)
|
|
}
|
|
popts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Completed.String()
|
|
if objInfo.UserTags != "" {
|
|
popts.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
|
|
}
|
|
if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
|
|
}
|
|
}
|
|
return
|
|
}
|
|
}
|
|
|
|
replicationStatus = replication.Completed
|
|
// use core client to avoid doing multipart on PUT
|
|
c := &miniogo.Core{Client: tgt.Client}
|
|
if rtype != replicateAll {
|
|
// replicate metadata for object tagging/copy with metadata replacement
|
|
srcOpts := miniogo.CopySrcOptions{
|
|
Bucket: dest.Bucket,
|
|
Object: object,
|
|
VersionID: objInfo.VersionID,
|
|
}
|
|
dstOpts := miniogo.PutObjectOptions{
|
|
Internal: miniogo.AdvancedPutOptions{
|
|
SourceVersionID: objInfo.VersionID,
|
|
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
|
|
}}
|
|
if _, err = c.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), srcOpts, dstOpts); err != nil {
|
|
replicationStatus = replication.Failed
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
|
}
|
|
} else {
|
|
putOpts, err := putReplicationOpts(ctx, dest, objInfo)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%w", bucket, cfg.RoleArn, err))
|
|
sendEvent(eventArgs{
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
return
|
|
}
|
|
var headerSize int
|
|
for k, v := range putOpts.Header() {
|
|
headerSize += len(k) + len(v)
|
|
}
|
|
|
|
opts := &bandwidth.MonitorReaderOptions{
|
|
Bucket: objInfo.Bucket,
|
|
HeaderSize: headerSize,
|
|
}
|
|
newCtx := ctx
|
|
if globalBucketMonitor.IsThrottled(bucket) {
|
|
var cancel context.CancelFunc
|
|
newCtx, cancel = context.WithTimeout(ctx, throttleDeadline)
|
|
defer cancel()
|
|
}
|
|
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
|
|
if objInfo.isMultipart() {
|
|
if err := replicateObjectWithMultipart(ctx, c, dest.Bucket, object,
|
|
r, objInfo, putOpts); err != nil {
|
|
replicationStatus = replication.Failed
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
|
}
|
|
} else {
|
|
if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil {
|
|
replicationStatus = replication.Failed
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
|
}
|
|
}
|
|
}
|
|
gr.Close()
|
|
closeOnDefer = false
|
|
|
|
prevReplStatus := objInfo.ReplicationStatus
|
|
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()
|
|
if objInfo.UserTags != "" {
|
|
objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
|
|
}
|
|
if ri.OpType == replication.ExistingObjectReplicationType {
|
|
objInfo.UserDefined[xhttp.MinIOReplicationResetStatus] = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), ri.ResetID)
|
|
}
|
|
// FIXME: add support for missing replication events
|
|
// - event.ObjectReplicationMissedThreshold
|
|
// - event.ObjectReplicationReplicatedAfterThreshold
|
|
var eventName = event.ObjectReplicationComplete
|
|
if replicationStatus == replication.Failed {
|
|
eventName = event.ObjectReplicationFailed
|
|
}
|
|
|
|
// Leave metadata in `PENDING` state if inline replication fails to save iops
|
|
if ri.OpType == replication.HealReplicationType ||
|
|
replicationStatus == replication.Completed {
|
|
popts := ObjectOptions{
|
|
MTime: objInfo.ModTime,
|
|
VersionID: objInfo.VersionID,
|
|
UserDefined: make(map[string]string, len(objInfo.UserDefined)),
|
|
}
|
|
for k, v := range objInfo.UserDefined {
|
|
popts.UserDefined[k] = v
|
|
}
|
|
popts.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()
|
|
if objInfo.UserTags != "" {
|
|
popts.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
|
|
}
|
|
if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w",
|
|
bucket, objInfo.Name, objInfo.VersionID, err))
|
|
}
|
|
opType := replication.MetadataReplicationType
|
|
if rtype == replicateAll {
|
|
opType = replication.ObjectReplicationType
|
|
}
|
|
globalReplicationStats.Update(bucket, size, replicationStatus, prevReplStatus, opType)
|
|
sendEvent(eventArgs{
|
|
EventName: eventName,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
}
|
|
// re-queue failures once more - keep a retry count to avoid flooding the queue if
|
|
// the target site is down. Leave it to scanner to catch up instead.
|
|
if replicationStatus != replication.Completed && ri.RetryCount < 1 {
|
|
ri.OpType = replication.HealReplicationType
|
|
ri.RetryCount++
|
|
globalReplicationPool.queueReplicaFailedTask(ri)
|
|
}
|
|
}
|
|
|
|
func replicateObjectWithMultipart(ctx context.Context, c *miniogo.Core, bucket, object string, r io.Reader, objInfo ObjectInfo, opts miniogo.PutObjectOptions) (err error) {
|
|
var uploadedParts []miniogo.CompletePart
|
|
uploadID, err := c.NewMultipartUpload(context.Background(), bucket, object, opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
if err != nil {
|
|
// block and abort remote upload upon failure.
|
|
if aerr := c.AbortMultipartUpload(ctx, bucket, object, uploadID); aerr != nil {
|
|
aerr = fmt.Errorf("Unable to cleanup failed multipart replication %s on remote %s/%s: %w", uploadID, bucket, object, aerr)
|
|
logger.LogIf(ctx, aerr)
|
|
}
|
|
}
|
|
}()
|
|
|
|
var (
|
|
hr *hash.Reader
|
|
pInfo miniogo.ObjectPart
|
|
)
|
|
|
|
for _, partInfo := range objInfo.Parts {
|
|
hr, err = hash.NewReader(r, partInfo.ActualSize, "", "", partInfo.ActualSize)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
pInfo, err = c.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, hr, partInfo.ActualSize, "", "", opts.ServerSideEncryption)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if pInfo.Size != partInfo.ActualSize {
|
|
return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.ActualSize)
|
|
}
|
|
uploadedParts = append(uploadedParts, miniogo.CompletePart{
|
|
PartNumber: pInfo.PartNumber,
|
|
ETag: pInfo.ETag,
|
|
})
|
|
}
|
|
_, err = c.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, miniogo.PutObjectOptions{
|
|
Internal: miniogo.AdvancedPutOptions{
|
|
SourceMTime: objInfo.ModTime,
|
|
// always set this to distinguish between `mc mirror` replication and serverside
|
|
ReplicationRequest: true,
|
|
}})
|
|
return err
|
|
}
|
|
|
|
// filterReplicationStatusMetadata filters replication status metadata for COPY
|
|
func filterReplicationStatusMetadata(metadata map[string]string) map[string]string {
|
|
// Copy on write
|
|
dst := metadata
|
|
var copied bool
|
|
delKey := func(key string) {
|
|
if _, ok := metadata[key]; !ok {
|
|
return
|
|
}
|
|
if !copied {
|
|
dst = make(map[string]string, len(metadata))
|
|
for k, v := range metadata {
|
|
dst[k] = v
|
|
}
|
|
copied = true
|
|
}
|
|
delete(dst, key)
|
|
}
|
|
|
|
delKey(xhttp.AmzBucketReplicationStatus)
|
|
return dst
|
|
}
|
|
|
|
// DeletedObjectReplicationInfo has info on deleted object
|
|
type DeletedObjectReplicationInfo struct {
|
|
DeletedObject
|
|
Bucket string
|
|
OpType replication.Type
|
|
ResetID string
|
|
}
|
|
|
|
// Replication specific APIName
|
|
const (
|
|
ReplicateObjectAPI = "ReplicateObject"
|
|
ReplicateDeleteAPI = "ReplicateDelete"
|
|
)
|
|
|
|
const (
|
|
// ReplicateQueued - replication being queued trail
|
|
ReplicateQueued = "replicate:queue"
|
|
|
|
// ReplicateExisting - audit trail for existing objects replication
|
|
ReplicateExisting = "replicate:existing"
|
|
// ReplicateExistingDelete - audit trail for delete replication triggered for existing delete markers
|
|
ReplicateExistingDelete = "replicate:existing:delete"
|
|
|
|
// ReplicateMRF - audit trail for replication from Most Recent Failures (MRF) queue
|
|
ReplicateMRF = "replicate:mrf"
|
|
// ReplicateIncoming - audit trail indicating replication started [could be from incoming/existing/heal activity]
|
|
ReplicateIncoming = "replicate:incoming"
|
|
// ReplicateHeal - audit trail for healing of failed/pending replications
|
|
ReplicateHeal = "replicate:heal"
|
|
// ReplicateDelete - audit trail for delete replication
|
|
ReplicateDelete = "replicate:delete"
|
|
)
|
|
|
|
var (
|
|
globalReplicationPool *ReplicationPool
|
|
globalReplicationStats *ReplicationStats
|
|
)
|
|
|
|
// ReplicationPool describes replication pool
|
|
type ReplicationPool struct {
|
|
objLayer ObjectLayer
|
|
ctx context.Context
|
|
mrfWorkerKillCh chan struct{}
|
|
workerKillCh chan struct{}
|
|
replicaCh chan ReplicateObjectInfo
|
|
replicaDeleteCh chan DeletedObjectReplicationInfo
|
|
mrfReplicaCh chan ReplicateObjectInfo
|
|
existingReplicaCh chan ReplicateObjectInfo
|
|
existingReplicaDeleteCh chan DeletedObjectReplicationInfo
|
|
workerSize int
|
|
mrfWorkerSize int
|
|
workerWg sync.WaitGroup
|
|
mrfWorkerWg sync.WaitGroup
|
|
once sync.Once
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// NewReplicationPool creates a pool of replication workers of specified size
|
|
func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool {
|
|
pool := &ReplicationPool{
|
|
replicaCh: make(chan ReplicateObjectInfo, 100000),
|
|
replicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000),
|
|
mrfReplicaCh: make(chan ReplicateObjectInfo, 100000),
|
|
workerKillCh: make(chan struct{}, opts.Workers),
|
|
mrfWorkerKillCh: make(chan struct{}, opts.FailedWorkers),
|
|
existingReplicaCh: make(chan ReplicateObjectInfo, 100000),
|
|
existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000),
|
|
ctx: ctx,
|
|
objLayer: o,
|
|
}
|
|
|
|
pool.ResizeWorkers(opts.Workers)
|
|
pool.ResizeFailedWorkers(opts.FailedWorkers)
|
|
go pool.AddExistingObjectReplicateWorker()
|
|
return pool
|
|
}
|
|
|
|
// AddMRFWorker adds a pending/failed replication worker to handle requests that could not be queued
|
|
// to the other workers
|
|
func (p *ReplicationPool) AddMRFWorker() {
|
|
for {
|
|
select {
|
|
case <-p.ctx.Done():
|
|
return
|
|
case oi, ok := <-p.mrfReplicaCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
replicateObject(p.ctx, oi, p.objLayer, ReplicateMRF)
|
|
case <-p.mrfWorkerKillCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// AddWorker adds a replication worker to the pool
|
|
func (p *ReplicationPool) AddWorker() {
|
|
defer p.workerWg.Done()
|
|
for {
|
|
select {
|
|
case <-p.ctx.Done():
|
|
return
|
|
case oi, ok := <-p.replicaCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
replicateObject(p.ctx, oi, p.objLayer, ReplicateIncoming)
|
|
case doi, ok := <-p.replicaDeleteCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
replicateDelete(p.ctx, doi, p.objLayer, ReplicateDelete)
|
|
case <-p.workerKillCh:
|
|
return
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
// AddExistingObjectReplicateWorker adds a worker to queue existing objects that need to be sync'd
|
|
func (p *ReplicationPool) AddExistingObjectReplicateWorker() {
|
|
for {
|
|
select {
|
|
case <-p.ctx.Done():
|
|
return
|
|
case oi, ok := <-p.existingReplicaCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
replicateObject(p.ctx, oi, p.objLayer, ReplicateExisting)
|
|
case doi, ok := <-p.existingReplicaDeleteCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
replicateDelete(p.ctx, doi, p.objLayer, ReplicateExistingDelete)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ResizeWorkers sets replication workers pool to new size
|
|
func (p *ReplicationPool) ResizeWorkers(n int) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
for p.workerSize < n {
|
|
p.workerSize++
|
|
p.workerWg.Add(1)
|
|
go p.AddWorker()
|
|
}
|
|
for p.workerSize > n {
|
|
p.workerSize--
|
|
go func() { p.workerKillCh <- struct{}{} }()
|
|
}
|
|
}
|
|
|
|
// ResizeFailedWorkers sets replication failed workers pool size
|
|
func (p *ReplicationPool) ResizeFailedWorkers(n int) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
for p.mrfWorkerSize < n {
|
|
p.mrfWorkerSize++
|
|
p.mrfWorkerWg.Add(1)
|
|
go p.AddMRFWorker()
|
|
}
|
|
for p.mrfWorkerSize > n {
|
|
p.mrfWorkerSize--
|
|
go func() { p.mrfWorkerKillCh <- struct{}{} }()
|
|
}
|
|
}
|
|
|
|
func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) {
|
|
if p == nil {
|
|
return
|
|
}
|
|
select {
|
|
case <-GlobalContext.Done():
|
|
p.once.Do(func() {
|
|
close(p.replicaCh)
|
|
close(p.mrfReplicaCh)
|
|
close(p.existingReplicaCh)
|
|
})
|
|
case p.mrfReplicaCh <- ri:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
|
|
if p == nil {
|
|
return
|
|
}
|
|
var ch chan ReplicateObjectInfo
|
|
switch ri.OpType {
|
|
case replication.ExistingObjectReplicationType:
|
|
ch = p.existingReplicaCh
|
|
case replication.HealReplicationType:
|
|
fallthrough
|
|
default:
|
|
ch = p.replicaCh
|
|
}
|
|
select {
|
|
case <-GlobalContext.Done():
|
|
p.once.Do(func() {
|
|
close(p.replicaCh)
|
|
close(p.mrfReplicaCh)
|
|
close(p.existingReplicaCh)
|
|
})
|
|
case ch <- ri:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInfo) {
|
|
if p == nil {
|
|
return
|
|
}
|
|
|
|
var ch chan DeletedObjectReplicationInfo
|
|
switch doi.OpType {
|
|
case replication.ExistingObjectReplicationType:
|
|
ch = p.existingReplicaDeleteCh
|
|
case replication.HealReplicationType:
|
|
fallthrough
|
|
default:
|
|
ch = p.replicaDeleteCh
|
|
}
|
|
|
|
select {
|
|
case <-GlobalContext.Done():
|
|
p.once.Do(func() {
|
|
close(p.replicaDeleteCh)
|
|
close(p.existingReplicaDeleteCh)
|
|
})
|
|
case ch <- doi:
|
|
default:
|
|
}
|
|
}
|
|
|
|
type replicationPoolOpts struct {
|
|
Workers int
|
|
FailedWorkers int
|
|
}
|
|
|
|
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
|
|
globalReplicationPool = NewReplicationPool(ctx, objectAPI, replicationPoolOpts{
|
|
Workers: globalAPIConfig.getReplicationWorkers(),
|
|
FailedWorkers: globalAPIConfig.getReplicationFailedWorkers(),
|
|
})
|
|
globalReplicationStats = NewReplicationStats(ctx, objectAPI)
|
|
}
|
|
|
|
// get Reader from replication target if active-active replication is in place and
|
|
// this node returns a 404
|
|
func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, proxy bool) {
|
|
tgt, oi, proxy, err := proxyHeadToRepTarget(ctx, bucket, object, opts)
|
|
if !proxy || err != nil {
|
|
return nil, false
|
|
}
|
|
fn, off, length, err := NewGetObjectReader(rs, oi, opts)
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
gopts := miniogo.GetObjectOptions{
|
|
VersionID: opts.VersionID,
|
|
ServerSideEncryption: opts.ServerSideEncryption,
|
|
Internal: miniogo.AdvancedGetOptions{
|
|
ReplicationProxyRequest: "true",
|
|
},
|
|
}
|
|
// get correct offsets for encrypted object
|
|
if off >= 0 && length >= 0 {
|
|
if err := gopts.SetRange(off, off+length-1); err != nil {
|
|
return nil, false
|
|
}
|
|
}
|
|
// Make sure to match ETag when proxying.
|
|
if err = gopts.SetMatchETag(oi.ETag); err != nil {
|
|
return nil, false
|
|
}
|
|
c := miniogo.Core{Client: tgt.Client}
|
|
obj, _, _, err := c.GetObject(ctx, bucket, object, gopts)
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
closeReader := func() { obj.Close() }
|
|
|
|
reader, err := fn(obj, h, closeReader)
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
reader.ObjInfo = oi.Clone()
|
|
return reader, true
|
|
}
|
|
|
|
// isProxyable returns true if replication config found for this bucket
|
|
func isProxyable(ctx context.Context, bucket string) bool {
|
|
cfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
dest := cfg.GetDestination()
|
|
return dest.Bucket == bucket
|
|
}
|
|
|
|
func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts ObjectOptions) (tgt *TargetClient, oi ObjectInfo, proxy bool, err error) {
|
|
// this option is set when active-active replication is in place between site A -> B,
|
|
// and site B does not have the object yet.
|
|
if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header
|
|
return nil, oi, false, nil
|
|
}
|
|
cfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil {
|
|
return nil, oi, false, err
|
|
}
|
|
dest := cfg.GetDestination()
|
|
if dest.Bucket != bucket { // not active-active
|
|
return nil, oi, false, err
|
|
}
|
|
ssec := false
|
|
if opts.ServerSideEncryption != nil {
|
|
ssec = opts.ServerSideEncryption.Type() == encrypt.SSEC
|
|
}
|
|
ropts := replication.ObjectOpts{
|
|
Name: object,
|
|
SSEC: ssec,
|
|
}
|
|
if !cfg.Replicate(ropts) { // no matching rule for object prefix
|
|
return nil, oi, false, nil
|
|
}
|
|
tgt = globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
|
|
if tgt == nil || tgt.IsOffline() {
|
|
return nil, oi, false, fmt.Errorf("target is offline or not configured")
|
|
}
|
|
// if proxying explicitly disabled on remote target
|
|
if tgt.disableProxy {
|
|
return nil, oi, false, nil
|
|
}
|
|
gopts := miniogo.GetObjectOptions{
|
|
VersionID: opts.VersionID,
|
|
ServerSideEncryption: opts.ServerSideEncryption,
|
|
Internal: miniogo.AdvancedGetOptions{
|
|
ReplicationProxyRequest: "true",
|
|
},
|
|
}
|
|
|
|
objInfo, err := tgt.StatObject(ctx, dest.Bucket, object, gopts)
|
|
if err != nil {
|
|
return nil, oi, false, err
|
|
}
|
|
|
|
tags, _ := tags.MapToObjectTags(objInfo.UserTags)
|
|
oi = ObjectInfo{
|
|
Bucket: bucket,
|
|
Name: object,
|
|
ModTime: objInfo.LastModified,
|
|
Size: objInfo.Size,
|
|
ETag: objInfo.ETag,
|
|
VersionID: objInfo.VersionID,
|
|
IsLatest: objInfo.IsLatest,
|
|
DeleteMarker: objInfo.IsDeleteMarker,
|
|
ContentType: objInfo.ContentType,
|
|
Expires: objInfo.Expires,
|
|
StorageClass: objInfo.StorageClass,
|
|
ReplicationStatus: replication.StatusType(objInfo.ReplicationStatus),
|
|
UserTags: tags.String(),
|
|
}
|
|
oi.UserDefined = make(map[string]string, len(objInfo.Metadata))
|
|
for k, v := range objInfo.Metadata {
|
|
oi.UserDefined[k] = v[0]
|
|
}
|
|
ce, ok := oi.UserDefined[xhttp.ContentEncoding]
|
|
if !ok {
|
|
ce, ok = oi.UserDefined[strings.ToLower(xhttp.ContentEncoding)]
|
|
}
|
|
if ok {
|
|
oi.ContentEncoding = ce
|
|
}
|
|
return tgt, oi, true, nil
|
|
}
|
|
|
|
// get object info from replication target if active-active replication is in place and
|
|
// this node returns a 404
|
|
func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, opts ObjectOptions) (oi ObjectInfo, proxy bool, err error) {
|
|
_, oi, proxy, err = proxyHeadToRepTarget(ctx, bucket, object, opts)
|
|
return oi, proxy, err
|
|
}
|
|
|
|
func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, sync bool, opType replication.Type) {
|
|
if sync {
|
|
replicateObject(ctx, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}, o, ReplicateIncoming)
|
|
} else {
|
|
globalReplicationPool.queueReplicaTask(ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType})
|
|
}
|
|
if sz, err := objInfo.GetActualSize(); err == nil {
|
|
globalReplicationStats.Update(objInfo.Bucket, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType)
|
|
}
|
|
}
|
|
|
|
func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer, sync bool) {
|
|
globalReplicationPool.queueReplicaDeleteTask(dv)
|
|
globalReplicationStats.Update(dv.Bucket, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
|
|
}
|
|
|
|
type replicationConfig struct {
|
|
Config *replication.Config
|
|
ResetID string
|
|
ResetBeforeDate time.Time
|
|
}
|
|
|
|
func (c replicationConfig) Empty() bool {
|
|
return c.Config == nil
|
|
}
|
|
func (c replicationConfig) Replicate(opts replication.ObjectOpts) bool {
|
|
return c.Config.Replicate(opts)
|
|
}
|
|
|
|
// Resync returns true if replication reset is requested
|
|
func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo) bool {
|
|
if c.Empty() {
|
|
return false
|
|
}
|
|
// existing object replication does not apply to un-versioned objects
|
|
if oi.VersionID == "" || oi.VersionID == nullVersionID {
|
|
return false
|
|
}
|
|
|
|
var replicate bool
|
|
if oi.DeleteMarker {
|
|
if c.Replicate(replication.ObjectOpts{
|
|
Name: oi.Name,
|
|
SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined),
|
|
UserTags: oi.UserTags,
|
|
DeleteMarker: oi.DeleteMarker,
|
|
VersionID: oi.VersionID,
|
|
OpType: replication.DeleteReplicationType,
|
|
ExistingObject: true}) {
|
|
replicate = true
|
|
}
|
|
} else {
|
|
// Ignore previous replication status when deciding if object can be re-replicated
|
|
objInfo := oi.Clone()
|
|
objInfo.ReplicationStatus = replication.StatusType("")
|
|
replicate, _ = mustReplicate(ctx, oi.Bucket, oi.Name, getMustReplicateOptions(objInfo, replication.ExistingObjectReplicationType))
|
|
}
|
|
return c.resync(oi, replicate)
|
|
}
|
|
|
|
// wrapper function for testability. Returns true if a new reset is requested on
|
|
// already replicated objects OR object qualifies for existing object replication
|
|
// and no reset requested.
|
|
func (c replicationConfig) resync(oi ObjectInfo, replicate bool) bool {
|
|
if !replicate {
|
|
return false
|
|
}
|
|
rs, ok := oi.UserDefined[xhttp.MinIOReplicationResetStatus]
|
|
if !ok { // existing object replication is enabled and object version is unreplicated so far.
|
|
if c.ResetID != "" && oi.ModTime.Before(c.ResetBeforeDate) { // trigger replication if `mc replicate reset` requested
|
|
return true
|
|
}
|
|
return oi.ReplicationStatus != replication.Completed
|
|
}
|
|
if c.ResetID == "" || c.ResetBeforeDate.Equal(timeSentinel) { // no reset in progress
|
|
return false
|
|
}
|
|
// if already replicated, return true if a new reset was requested.
|
|
splits := strings.SplitN(rs, ";", 2)
|
|
newReset := splits[1] != c.ResetID
|
|
if !newReset && oi.ReplicationStatus == replication.Completed {
|
|
// already replicated and no reset requested
|
|
return false
|
|
}
|
|
return newReset && oi.ModTime.Before(c.ResetBeforeDate)
|
|
}
|