mirror of
https://github.com/minio/minio.git
synced 2025-01-13 07:53:21 -05:00
7875d472bc
Skip notifications on objects that might have had an error during deletion, this also avoids unnecessary replication attempt on such objects. Refactor some places to make sure that we have notified the client before we - notify - schedule for replication - lifecycle etc.
995 lines
30 KiB
Go
995 lines
30 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"reflect"
|
|
"strings"
|
|
"time"
|
|
|
|
minio "github.com/minio/minio-go/v7"
|
|
miniogo "github.com/minio/minio-go/v7"
|
|
"github.com/minio/minio-go/v7/pkg/encrypt"
|
|
"github.com/minio/minio-go/v7/pkg/tags"
|
|
"github.com/minio/minio/cmd/crypto"
|
|
xhttp "github.com/minio/minio/cmd/http"
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/bucket/bandwidth"
|
|
"github.com/minio/minio/pkg/bucket/replication"
|
|
"github.com/minio/minio/pkg/event"
|
|
iampolicy "github.com/minio/minio/pkg/iam/policy"
|
|
"github.com/minio/minio/pkg/madmin"
|
|
)
|
|
|
|
// gets replication config associated to a given bucket name.
|
|
func getReplicationConfig(ctx context.Context, bucketName string) (rc *replication.Config, err error) {
|
|
if globalIsGateway {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return nil, errServerNotInitialized
|
|
}
|
|
|
|
return nil, BucketReplicationConfigNotFound{Bucket: bucketName}
|
|
}
|
|
|
|
return globalBucketMetadataSys.GetReplicationConfig(ctx, bucketName)
|
|
}
|
|
|
|
// validateReplicationDestination returns error if replication destination bucket missing or not configured
|
|
// It also returns true if replication destination is same as this server.
|
|
func validateReplicationDestination(ctx context.Context, bucket string, rCfg *replication.Config) (bool, error) {
|
|
arn, err := madmin.ParseARN(rCfg.RoleArn)
|
|
if err != nil {
|
|
return false, BucketRemoteArnInvalid{}
|
|
}
|
|
if arn.Type != madmin.ReplicationService {
|
|
return false, BucketRemoteArnTypeInvalid{}
|
|
}
|
|
clnt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rCfg.RoleArn)
|
|
if clnt == nil {
|
|
return false, BucketRemoteTargetNotFound{Bucket: bucket}
|
|
}
|
|
if found, _ := clnt.BucketExists(ctx, rCfg.GetDestination().Bucket); !found {
|
|
return false, BucketRemoteDestinationNotFound{Bucket: rCfg.GetDestination().Bucket}
|
|
}
|
|
if ret, err := globalBucketObjectLockSys.Get(bucket); err == nil {
|
|
if ret.LockEnabled {
|
|
lock, _, _, _, err := clnt.GetObjectLockConfig(ctx, rCfg.GetDestination().Bucket)
|
|
if err != nil || lock != "Enabled" {
|
|
return false, BucketReplicationDestinationMissingLock{Bucket: rCfg.GetDestination().Bucket}
|
|
}
|
|
}
|
|
}
|
|
// validate replication ARN against target endpoint
|
|
c, ok := globalBucketTargetSys.arnRemotesMap[rCfg.RoleArn]
|
|
if ok {
|
|
if c.EndpointURL().String() == clnt.EndpointURL().String() {
|
|
sameTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort)
|
|
return sameTarget, nil
|
|
}
|
|
}
|
|
return false, BucketRemoteTargetNotFound{Bucket: bucket}
|
|
}
|
|
|
|
func mustReplicateWeb(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string, permErr APIErrorCode) (replicate bool, sync bool) {
|
|
if permErr != ErrNone {
|
|
return
|
|
}
|
|
return mustReplicater(ctx, bucket, object, meta, replStatus)
|
|
}
|
|
|
|
// mustReplicate returns 2 booleans - true if object meets replication criteria and true if replication is to be done in
|
|
// a synchronous manner.
|
|
func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) (replicate bool, sync bool) {
|
|
if s3Err := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, "", r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone {
|
|
return
|
|
}
|
|
return mustReplicater(ctx, bucket, object, meta, replStatus)
|
|
}
|
|
|
|
// mustReplicater returns 2 booleans - true if object meets replication criteria and true if replication is to be done in
|
|
// a synchronous manner.
|
|
func mustReplicater(ctx context.Context, bucket, object string, meta map[string]string, replStatus string) (replicate bool, sync bool) {
|
|
if globalIsGateway {
|
|
return replicate, sync
|
|
}
|
|
if rs, ok := meta[xhttp.AmzBucketReplicationStatus]; ok {
|
|
replStatus = rs
|
|
}
|
|
if replication.StatusType(replStatus) == replication.Replica {
|
|
return replicate, sync
|
|
}
|
|
cfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil {
|
|
return replicate, sync
|
|
}
|
|
opts := replication.ObjectOpts{
|
|
Name: object,
|
|
SSEC: crypto.SSEC.IsEncrypted(meta),
|
|
}
|
|
tagStr, ok := meta[xhttp.AmzObjectTagging]
|
|
if ok {
|
|
opts.UserTags = tagStr
|
|
}
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
|
|
// the target online status should not be used here while deciding
|
|
// whether to replicate as the target could be temporarily down
|
|
if tgt != nil {
|
|
return cfg.Replicate(opts), tgt.replicateSync
|
|
}
|
|
return cfg.Replicate(opts), false
|
|
}
|
|
|
|
// Standard headers that needs to be extracted from User metadata.
|
|
var standardHeaders = []string{
|
|
xhttp.ContentType,
|
|
xhttp.CacheControl,
|
|
xhttp.ContentEncoding,
|
|
xhttp.ContentLanguage,
|
|
xhttp.ContentDisposition,
|
|
xhttp.AmzStorageClass,
|
|
xhttp.AmzObjectTagging,
|
|
xhttp.AmzBucketReplicationStatus,
|
|
xhttp.AmzObjectLockMode,
|
|
xhttp.AmzObjectLockRetainUntilDate,
|
|
xhttp.AmzObjectLockLegalHold,
|
|
xhttp.AmzTagCount,
|
|
xhttp.AmzServerSideEncryption,
|
|
}
|
|
|
|
// returns true if any of the objects being deleted qualifies for replication.
|
|
func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToDelete) bool {
|
|
c, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil || c == nil {
|
|
return false
|
|
}
|
|
for _, obj := range objects {
|
|
if c.HasActiveRules(obj.ObjectName, true) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// isStandardHeader returns true if header is a supported header and not a custom header
|
|
func isStandardHeader(matchHeaderKey string) bool {
|
|
return equals(matchHeaderKey, standardHeaders...)
|
|
}
|
|
|
|
// returns whether object version is a deletemarker and if object qualifies for replication
|
|
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (dm, replicate, sync bool) {
|
|
rcfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil || rcfg == nil {
|
|
return false, false, sync
|
|
}
|
|
// when incoming delete is removal of a delete marker( a.k.a versioned delete),
|
|
// GetObjectInfo returns extra information even though it returns errFileNotFound
|
|
if gerr != nil {
|
|
validReplStatus := false
|
|
switch oi.ReplicationStatus {
|
|
// Point to note: even if two way replication with Deletemarker or Delete sync is enabled,
|
|
// deletion of a REPLICA object/deletemarker will not trigger a replication to the other cluster.
|
|
case replication.Pending, replication.Completed, replication.Failed:
|
|
validReplStatus = true
|
|
}
|
|
if oi.DeleteMarker && validReplStatus {
|
|
return oi.DeleteMarker, true, sync
|
|
}
|
|
return oi.DeleteMarker, false, sync
|
|
}
|
|
if oi.ReplicationStatus == replication.Replica {
|
|
// avoid replicating a replica in bi-directional replication
|
|
return oi.DeleteMarker, false, sync
|
|
}
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn)
|
|
// the target online status should not be used here while deciding
|
|
// whether to replicate deletes as the target could be temporarily down
|
|
if tgt == nil {
|
|
return oi.DeleteMarker, false, false
|
|
}
|
|
opts := replication.ObjectOpts{
|
|
Name: dobj.ObjectName,
|
|
SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined),
|
|
UserTags: oi.UserTags,
|
|
DeleteMarker: oi.DeleteMarker,
|
|
VersionID: dobj.VersionID,
|
|
}
|
|
return oi.DeleteMarker, rcfg.Replicate(opts), tgt.replicateSync
|
|
}
|
|
|
|
// replicate deletes to the designated replication target if replication configuration
|
|
// has delete marker replication or delete replication (MinIO extension to allow deletes where version id
|
|
// is specified) enabled.
|
|
// Similar to bucket replication for PUT operation, soft delete (a.k.a setting delete marker) and
|
|
// permanent deletes (by specifying a version ID in the delete operation) have three states "Pending", "Complete"
|
|
// and "Failed" to mark the status of the replication of "DELETE" operation. All failed operations can
|
|
// then be retried by healing. In the case of permanent deletes, until the replication is completed on the
|
|
// target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently
|
|
// deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds
|
|
// on target.
|
|
func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectAPI ObjectLayer) {
|
|
bucket := dobj.Bucket
|
|
versionID := dobj.DeleteMarkerVersionID
|
|
if versionID == "" {
|
|
versionID = dobj.VersionID
|
|
}
|
|
|
|
rcfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil || rcfg == nil {
|
|
logger.LogIf(ctx, err)
|
|
sendEvent(eventArgs{
|
|
BucketName: bucket,
|
|
Object: ObjectInfo{
|
|
Bucket: bucket,
|
|
Name: dobj.ObjectName,
|
|
VersionID: versionID,
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
},
|
|
Host: "Internal: [Replication]",
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
})
|
|
return
|
|
}
|
|
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn)
|
|
if tgt == nil {
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, rcfg.RoleArn))
|
|
sendEvent(eventArgs{
|
|
BucketName: bucket,
|
|
Object: ObjectInfo{
|
|
Bucket: bucket,
|
|
Name: dobj.ObjectName,
|
|
VersionID: versionID,
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
},
|
|
Host: "Internal: [Replication]",
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
})
|
|
return
|
|
}
|
|
|
|
rmErr := tgt.RemoveObject(ctx, rcfg.GetDestination().Bucket, dobj.ObjectName, miniogo.RemoveObjectOptions{
|
|
VersionID: versionID,
|
|
Internal: miniogo.AdvancedRemoveOptions{
|
|
ReplicationDeleteMarker: dobj.DeleteMarkerVersionID != "",
|
|
ReplicationMTime: dobj.DeleteMarkerMTime.Time,
|
|
ReplicationStatus: miniogo.ReplicationStatusReplica,
|
|
},
|
|
})
|
|
|
|
replicationStatus := dobj.DeleteMarkerReplicationStatus
|
|
versionPurgeStatus := dobj.VersionPurgeStatus
|
|
|
|
if rmErr != nil {
|
|
if dobj.VersionID == "" {
|
|
replicationStatus = string(replication.Failed)
|
|
} else {
|
|
versionPurgeStatus = Failed
|
|
}
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate delete marker to %s/%s(%s): %s", rcfg.GetDestination().Bucket, dobj.ObjectName, versionID, rmErr))
|
|
} else {
|
|
if dobj.VersionID == "" {
|
|
replicationStatus = string(replication.Completed)
|
|
} else {
|
|
versionPurgeStatus = Complete
|
|
}
|
|
}
|
|
|
|
var eventName = event.ObjectReplicationComplete
|
|
if replicationStatus == string(replication.Failed) || versionPurgeStatus == Failed {
|
|
eventName = event.ObjectReplicationFailed
|
|
}
|
|
|
|
// Update metadata on the delete marker or purge permanent delete if replication success.
|
|
dobjInfo, err := objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{
|
|
VersionID: versionID,
|
|
DeleteMarkerReplicationStatus: replicationStatus,
|
|
VersionPurgeStatus: versionPurgeStatus,
|
|
Versioned: globalBucketVersioningSys.Enabled(bucket),
|
|
VersionSuspended: globalBucketVersioningSys.Suspended(bucket),
|
|
})
|
|
if err != nil && !isErrVersionNotFound(err) { // VersionNotFound would be reported by pool that object version is missing on.
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %s", bucket, dobj.ObjectName, versionID, err))
|
|
sendEvent(eventArgs{
|
|
BucketName: bucket,
|
|
Object: ObjectInfo{
|
|
Bucket: bucket,
|
|
Name: dobj.ObjectName,
|
|
VersionID: versionID,
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
},
|
|
Host: "Internal: [Replication]",
|
|
EventName: eventName,
|
|
})
|
|
} else {
|
|
sendEvent(eventArgs{
|
|
BucketName: bucket,
|
|
Object: dobjInfo,
|
|
Host: "Internal: [Replication]",
|
|
EventName: eventName,
|
|
})
|
|
}
|
|
}
|
|
|
|
func getCopyObjMetadata(oi ObjectInfo, dest replication.Destination) map[string]string {
|
|
meta := make(map[string]string, len(oi.UserDefined))
|
|
for k, v := range oi.UserDefined {
|
|
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
|
|
continue
|
|
}
|
|
|
|
if equals(k, xhttp.AmzBucketReplicationStatus) {
|
|
continue
|
|
}
|
|
|
|
// https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w
|
|
if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) {
|
|
continue
|
|
}
|
|
|
|
meta[k] = v
|
|
}
|
|
|
|
if oi.ContentEncoding != "" {
|
|
meta[xhttp.ContentEncoding] = oi.ContentEncoding
|
|
}
|
|
|
|
if oi.ContentType != "" {
|
|
meta[xhttp.ContentType] = oi.ContentType
|
|
}
|
|
|
|
if oi.UserTags != "" {
|
|
meta[xhttp.AmzObjectTagging] = oi.UserTags
|
|
meta[xhttp.AmzTagDirective] = "REPLACE"
|
|
}
|
|
|
|
sc := dest.StorageClass
|
|
if sc == "" {
|
|
sc = oi.StorageClass
|
|
}
|
|
if sc != "" {
|
|
meta[xhttp.AmzStorageClass] = sc
|
|
}
|
|
meta[xhttp.MinIOSourceETag] = oi.ETag
|
|
meta[xhttp.MinIOSourceMTime] = oi.ModTime.Format(time.RFC3339Nano)
|
|
meta[xhttp.AmzBucketReplicationStatus] = replication.Replica.String()
|
|
return meta
|
|
}
|
|
|
|
type caseInsensitiveMap map[string]string
|
|
|
|
// Lookup map entry case insensitively.
|
|
func (m caseInsensitiveMap) Lookup(key string) (string, bool) {
|
|
if len(m) == 0 {
|
|
return "", false
|
|
}
|
|
for _, k := range []string{
|
|
key,
|
|
strings.ToLower(key),
|
|
http.CanonicalHeaderKey(key),
|
|
} {
|
|
v, ok := m[k]
|
|
if ok {
|
|
return v, ok
|
|
}
|
|
}
|
|
return "", false
|
|
}
|
|
|
|
func putReplicationOpts(ctx context.Context, dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, err error) {
|
|
meta := make(map[string]string)
|
|
for k, v := range objInfo.UserDefined {
|
|
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
|
|
continue
|
|
}
|
|
if isStandardHeader(k) {
|
|
continue
|
|
}
|
|
meta[k] = v
|
|
}
|
|
|
|
sc := dest.StorageClass
|
|
if sc == "" {
|
|
sc = objInfo.StorageClass
|
|
}
|
|
putOpts = miniogo.PutObjectOptions{
|
|
UserMetadata: meta,
|
|
ContentType: objInfo.ContentType,
|
|
ContentEncoding: objInfo.ContentEncoding,
|
|
StorageClass: sc,
|
|
Internal: miniogo.AdvancedPutOptions{
|
|
SourceVersionID: objInfo.VersionID,
|
|
ReplicationStatus: miniogo.ReplicationStatusReplica,
|
|
SourceMTime: objInfo.ModTime,
|
|
SourceETag: objInfo.ETag,
|
|
},
|
|
}
|
|
if objInfo.UserTags != "" {
|
|
tag, _ := tags.ParseObjectTags(objInfo.UserTags)
|
|
if tag != nil {
|
|
putOpts.UserTags = tag.ToMap()
|
|
}
|
|
}
|
|
|
|
lkMap := caseInsensitiveMap(objInfo.UserDefined)
|
|
if lang, ok := lkMap.Lookup(xhttp.ContentLanguage); ok {
|
|
putOpts.ContentLanguage = lang
|
|
}
|
|
if disp, ok := lkMap.Lookup(xhttp.ContentDisposition); ok {
|
|
putOpts.ContentDisposition = disp
|
|
}
|
|
if cc, ok := lkMap.Lookup(xhttp.CacheControl); ok {
|
|
putOpts.CacheControl = cc
|
|
}
|
|
if mode, ok := lkMap.Lookup(xhttp.AmzObjectLockMode); ok {
|
|
rmode := miniogo.RetentionMode(mode)
|
|
putOpts.Mode = rmode
|
|
}
|
|
if retainDateStr, ok := lkMap.Lookup(xhttp.AmzObjectLockRetainUntilDate); ok {
|
|
rdate, err := time.Parse(time.RFC3339, retainDateStr)
|
|
if err != nil {
|
|
return putOpts, err
|
|
}
|
|
putOpts.RetainUntilDate = rdate
|
|
}
|
|
if lhold, ok := lkMap.Lookup(xhttp.AmzObjectLockLegalHold); ok {
|
|
putOpts.LegalHold = miniogo.LegalHoldStatus(lhold)
|
|
}
|
|
if crypto.S3.IsEncrypted(objInfo.UserDefined) {
|
|
putOpts.ServerSideEncryption = encrypt.NewSSE()
|
|
}
|
|
return
|
|
}
|
|
|
|
type replicationAction string
|
|
|
|
const (
|
|
replicateMetadata replicationAction = "metadata"
|
|
replicateNone replicationAction = "none"
|
|
replicateAll replicationAction = "all"
|
|
)
|
|
|
|
// matches k1 with all keys, returns 'true' if one of them matches
|
|
func equals(k1 string, keys ...string) bool {
|
|
for _, k2 := range keys {
|
|
if strings.ToLower(k1) == strings.ToLower(k2) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// returns replicationAction by comparing metadata between source and target
|
|
func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationAction {
|
|
// needs full replication
|
|
if oi1.ETag != oi2.ETag ||
|
|
oi1.VersionID != oi2.VersionID ||
|
|
oi1.Size != oi2.Size ||
|
|
oi1.DeleteMarker != oi2.IsDeleteMarker ||
|
|
oi1.ModTime.Unix() != oi2.LastModified.Unix() {
|
|
return replicateAll
|
|
}
|
|
|
|
if oi1.ContentType != oi2.ContentType {
|
|
return replicateMetadata
|
|
}
|
|
|
|
if oi1.ContentEncoding != "" {
|
|
enc, ok := oi2.Metadata[xhttp.ContentEncoding]
|
|
if !ok {
|
|
enc, ok = oi2.Metadata[strings.ToLower(xhttp.ContentEncoding)]
|
|
if !ok {
|
|
return replicateMetadata
|
|
}
|
|
}
|
|
if strings.Join(enc, ",") != oi1.ContentEncoding {
|
|
return replicateMetadata
|
|
}
|
|
}
|
|
|
|
t, _ := tags.ParseObjectTags(oi1.UserTags)
|
|
if !reflect.DeepEqual(oi2.UserTags, t.ToMap()) {
|
|
return replicateMetadata
|
|
}
|
|
|
|
// Compare only necessary headers
|
|
compareKeys := []string{
|
|
"Expires",
|
|
"Cache-Control",
|
|
"Content-Language",
|
|
"Content-Disposition",
|
|
"X-Amz-Object-Lock-Mode",
|
|
"X-Amz-Object-Lock-Retain-Until-Date",
|
|
"X-Amz-Object-Lock-Legal-Hold",
|
|
"X-Amz-Website-Redirect-Location",
|
|
"X-Amz-Meta-",
|
|
}
|
|
|
|
// compare metadata on both maps to see if meta is identical
|
|
compareMeta1 := make(map[string]string)
|
|
for k, v := range oi1.UserDefined {
|
|
var found bool
|
|
for _, prefix := range compareKeys {
|
|
if !strings.HasPrefix(strings.ToLower(k), strings.ToLower(prefix)) {
|
|
continue
|
|
}
|
|
found = true
|
|
break
|
|
}
|
|
if found {
|
|
compareMeta1[strings.ToLower(k)] = v
|
|
}
|
|
}
|
|
|
|
compareMeta2 := make(map[string]string)
|
|
for k, v := range oi2.Metadata {
|
|
var found bool
|
|
for _, prefix := range compareKeys {
|
|
if !strings.HasPrefix(strings.ToLower(k), strings.ToLower(prefix)) {
|
|
continue
|
|
}
|
|
found = true
|
|
break
|
|
}
|
|
if found {
|
|
compareMeta2[strings.ToLower(k)] = strings.Join(v, ",")
|
|
}
|
|
}
|
|
|
|
if !reflect.DeepEqual(compareMeta1, compareMeta2) {
|
|
return replicateMetadata
|
|
}
|
|
|
|
return replicateNone
|
|
}
|
|
|
|
// replicateObject replicates the specified version of the object to destination bucket
|
|
// The source object is then updated to reflect the replication status.
|
|
func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLayer) {
|
|
z, ok := objectAPI.(*erasureServerPools)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
bucket := objInfo.Bucket
|
|
object := objInfo.Name
|
|
|
|
cfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
sendEvent(eventArgs{
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
return
|
|
}
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
|
|
if tgt == nil {
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, cfg.RoleArn))
|
|
sendEvent(eventArgs{
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
return
|
|
}
|
|
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
|
|
VersionID: objInfo.VersionID,
|
|
})
|
|
if err != nil {
|
|
sendEvent(eventArgs{
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
logger.LogIf(ctx, err)
|
|
return
|
|
}
|
|
defer gr.Close() // hold read lock for entire transaction
|
|
|
|
objInfo = gr.ObjInfo
|
|
size, err := objInfo.GetActualSize()
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
sendEvent(eventArgs{
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
return
|
|
}
|
|
|
|
dest := cfg.GetDestination()
|
|
if dest.Bucket == "" {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate object %s(%s), bucket is empty", objInfo.Name, objInfo.VersionID))
|
|
sendEvent(eventArgs{
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
return
|
|
}
|
|
|
|
rtype := replicateAll
|
|
oi, err := tgt.StatObject(ctx, dest.Bucket, object, miniogo.StatObjectOptions{
|
|
VersionID: objInfo.VersionID,
|
|
Internal: miniogo.AdvancedGetOptions{
|
|
ReplicationProxyRequest: "false",
|
|
}})
|
|
if err == nil {
|
|
rtype = getReplicationAction(objInfo, oi)
|
|
if rtype == replicateNone {
|
|
// object with same VersionID already exists, replication kicked off by
|
|
// PutObject might have completed.
|
|
return
|
|
}
|
|
}
|
|
replicationStatus := replication.Completed
|
|
if rtype != replicateAll {
|
|
// replicate metadata for object tagging/copy with metadata replacement
|
|
srcOpts := miniogo.CopySrcOptions{
|
|
Bucket: dest.Bucket,
|
|
Object: object,
|
|
VersionID: objInfo.VersionID}
|
|
dstOpts := miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{SourceVersionID: objInfo.VersionID}}
|
|
c := &miniogo.Core{Client: tgt.Client}
|
|
|
|
if _, err = c.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), srcOpts, dstOpts); err != nil {
|
|
replicationStatus = replication.Failed
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
|
}
|
|
} else {
|
|
target, err := globalBucketMetadataSys.GetBucketTarget(bucket, cfg.RoleArn)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%s", bucket, cfg.RoleArn, err))
|
|
sendEvent(eventArgs{
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
return
|
|
}
|
|
|
|
putOpts, err := putReplicationOpts(ctx, dest, objInfo)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%w", bucket, cfg.RoleArn, err))
|
|
sendEvent(eventArgs{
|
|
EventName: event.ObjectReplicationNotTracked,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
return
|
|
}
|
|
|
|
// Setup bandwidth throttling
|
|
peers, _ := globalEndpoints.peers()
|
|
totalNodesCount := len(peers)
|
|
if totalNodesCount == 0 {
|
|
totalNodesCount = 1 // For standalone erasure coding
|
|
}
|
|
b := target.BandwidthLimit / int64(totalNodesCount)
|
|
var headerSize int
|
|
for k, v := range putOpts.Header() {
|
|
headerSize += len(k) + len(v)
|
|
}
|
|
|
|
// r takes over closing gr.
|
|
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit)
|
|
if _, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, putOpts); err != nil {
|
|
replicationStatus = replication.Failed
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
|
|
}
|
|
defer r.Close()
|
|
}
|
|
|
|
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()
|
|
if objInfo.UserTags != "" {
|
|
objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
|
|
}
|
|
|
|
// FIXME: add support for missing replication events
|
|
// - event.ObjectReplicationMissedThreshold
|
|
// - event.ObjectReplicationReplicatedAfterThreshold
|
|
var eventName = event.ObjectReplicationComplete
|
|
if replicationStatus == replication.Failed {
|
|
eventName = event.ObjectReplicationFailed
|
|
}
|
|
|
|
// This lower level implementation is necessary to avoid write locks from CopyObject.
|
|
poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
|
|
} else {
|
|
if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, objInfo.UserDefined, ObjectOptions{
|
|
VersionID: objInfo.VersionID,
|
|
}); err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
|
|
}
|
|
}
|
|
sendEvent(eventArgs{
|
|
EventName: eventName,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
}
|
|
|
|
// filterReplicationStatusMetadata filters replication status metadata for COPY
|
|
func filterReplicationStatusMetadata(metadata map[string]string) map[string]string {
|
|
// Copy on write
|
|
dst := metadata
|
|
var copied bool
|
|
delKey := func(key string) {
|
|
if _, ok := metadata[key]; !ok {
|
|
return
|
|
}
|
|
if !copied {
|
|
dst = make(map[string]string, len(metadata))
|
|
for k, v := range metadata {
|
|
dst[k] = v
|
|
}
|
|
copied = true
|
|
}
|
|
delete(dst, key)
|
|
}
|
|
|
|
delKey(xhttp.AmzBucketReplicationStatus)
|
|
return dst
|
|
}
|
|
|
|
// DeletedObjectVersionInfo has info on deleted object
|
|
type DeletedObjectVersionInfo struct {
|
|
DeletedObject
|
|
Bucket string
|
|
}
|
|
type replicationState struct {
|
|
// add future metrics here
|
|
replicaCh chan ObjectInfo
|
|
replicaDeleteCh chan DeletedObjectVersionInfo
|
|
}
|
|
|
|
func (r *replicationState) queueReplicaTask(oi ObjectInfo) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
select {
|
|
case r.replicaCh <- oi:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (r *replicationState) queueReplicaDeleteTask(doi DeletedObjectVersionInfo) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
select {
|
|
case r.replicaDeleteCh <- doi:
|
|
default:
|
|
}
|
|
}
|
|
|
|
var (
|
|
globalReplicationState *replicationState
|
|
)
|
|
|
|
func newReplicationState() *replicationState {
|
|
rs := &replicationState{
|
|
replicaCh: make(chan ObjectInfo, 10000),
|
|
replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000),
|
|
}
|
|
go func() {
|
|
<-GlobalContext.Done()
|
|
close(rs.replicaCh)
|
|
close(rs.replicaDeleteCh)
|
|
}()
|
|
return rs
|
|
}
|
|
|
|
// addWorker creates a new worker to process tasks
|
|
func (r *replicationState) addWorker(ctx context.Context, objectAPI ObjectLayer) {
|
|
// Add a new worker.
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case oi, ok := <-r.replicaCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
replicateObject(ctx, oi, objectAPI)
|
|
case doi, ok := <-r.replicaDeleteCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
replicateDelete(ctx, doi, objectAPI)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
|
|
if globalReplicationState == nil {
|
|
return
|
|
}
|
|
|
|
// Start replication workers per count set in api config or MINIO_API_REPLICATION_WORKERS.
|
|
for i := 0; i < globalAPIConfig.getReplicationWorkers(); i++ {
|
|
globalReplicationState.addWorker(ctx, objectAPI)
|
|
}
|
|
}
|
|
|
|
// get Reader from replication target if active-active replication is in place and
|
|
// this node returns a 404
|
|
func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, proxy bool) {
|
|
tgt, oi, proxy, err := proxyHeadToRepTarget(ctx, bucket, object, opts)
|
|
if !proxy || err != nil {
|
|
return nil, false
|
|
}
|
|
fn, off, length, err := NewGetObjectReader(rs, oi, opts)
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
gopts := miniogo.GetObjectOptions{
|
|
VersionID: opts.VersionID,
|
|
ServerSideEncryption: opts.ServerSideEncryption,
|
|
Internal: miniogo.AdvancedGetOptions{
|
|
ReplicationProxyRequest: "true",
|
|
},
|
|
}
|
|
// get correct offsets for encrypted object
|
|
if off >= 0 && length >= 0 {
|
|
if err := gopts.SetRange(off, off+length-1); err != nil {
|
|
return nil, false
|
|
}
|
|
}
|
|
// Make sure to match ETag when proxying.
|
|
if err = gopts.SetMatchETag(oi.ETag); err != nil {
|
|
return nil, false
|
|
}
|
|
c := miniogo.Core{Client: tgt.Client}
|
|
obj, _, _, err := c.GetObject(ctx, bucket, object, gopts)
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
closeReader := func() { obj.Close() }
|
|
|
|
reader, err := fn(obj, h, opts.CheckPrecondFn, closeReader)
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
reader.ObjInfo = oi.Clone()
|
|
return reader, true
|
|
}
|
|
|
|
// isProxyable returns true if replication config found for this bucket
|
|
func isProxyable(ctx context.Context, bucket string) bool {
|
|
cfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
dest := cfg.GetDestination()
|
|
return dest.Bucket == bucket
|
|
}
|
|
|
|
func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts ObjectOptions) (tgt *TargetClient, oi ObjectInfo, proxy bool, err error) {
|
|
// this option is set when active-active replication is in place between site A -> B,
|
|
// and site B does not have the object yet.
|
|
if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header
|
|
return nil, oi, false, nil
|
|
}
|
|
cfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil {
|
|
return nil, oi, false, err
|
|
}
|
|
dest := cfg.GetDestination()
|
|
if dest.Bucket != bucket { // not active-active
|
|
return nil, oi, false, err
|
|
}
|
|
ssec := false
|
|
if opts.ServerSideEncryption != nil {
|
|
ssec = opts.ServerSideEncryption.Type() == encrypt.SSEC
|
|
}
|
|
ropts := replication.ObjectOpts{
|
|
Name: object,
|
|
SSEC: ssec,
|
|
}
|
|
if !cfg.Replicate(ropts) { // no matching rule for object prefix
|
|
return nil, oi, false, nil
|
|
}
|
|
tgt = globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
|
|
if tgt == nil || tgt.isOffline() {
|
|
return nil, oi, false, fmt.Errorf("target is offline or not configured")
|
|
}
|
|
|
|
gopts := miniogo.GetObjectOptions{
|
|
VersionID: opts.VersionID,
|
|
ServerSideEncryption: opts.ServerSideEncryption,
|
|
Internal: miniogo.AdvancedGetOptions{
|
|
ReplicationProxyRequest: "true",
|
|
},
|
|
}
|
|
|
|
objInfo, err := tgt.StatObject(ctx, dest.Bucket, object, gopts)
|
|
if err != nil {
|
|
return nil, oi, false, err
|
|
}
|
|
|
|
tags, _ := tags.MapToObjectTags(objInfo.UserTags)
|
|
oi = ObjectInfo{
|
|
Bucket: bucket,
|
|
Name: object,
|
|
ModTime: objInfo.LastModified,
|
|
Size: objInfo.Size,
|
|
ETag: objInfo.ETag,
|
|
VersionID: objInfo.VersionID,
|
|
IsLatest: objInfo.IsLatest,
|
|
DeleteMarker: objInfo.IsDeleteMarker,
|
|
ContentType: objInfo.ContentType,
|
|
Expires: objInfo.Expires,
|
|
StorageClass: objInfo.StorageClass,
|
|
ReplicationStatus: replication.StatusType(objInfo.ReplicationStatus),
|
|
UserTags: tags.String(),
|
|
}
|
|
oi.UserDefined = make(map[string]string, len(objInfo.Metadata))
|
|
for k, v := range objInfo.Metadata {
|
|
oi.UserDefined[k] = v[0]
|
|
}
|
|
ce, ok := oi.UserDefined[xhttp.ContentEncoding]
|
|
if !ok {
|
|
ce, ok = oi.UserDefined[strings.ToLower(xhttp.ContentEncoding)]
|
|
}
|
|
if ok {
|
|
oi.ContentEncoding = ce
|
|
}
|
|
return tgt, oi, true, nil
|
|
}
|
|
|
|
// get object info from replication target if active-active replication is in place and
|
|
// this node returns a 404
|
|
func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, opts ObjectOptions) (oi ObjectInfo, proxy bool, err error) {
|
|
_, oi, proxy, err = proxyHeadToRepTarget(ctx, bucket, object, opts)
|
|
return oi, proxy, err
|
|
}
|
|
|
|
func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, sync bool) {
|
|
if sync {
|
|
replicateObject(ctx, objInfo, o)
|
|
} else {
|
|
globalReplicationState.queueReplicaTask(objInfo)
|
|
}
|
|
}
|
|
|
|
func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectVersionInfo, o ObjectLayer, sync bool) {
|
|
if sync {
|
|
replicateDelete(ctx, dv, o)
|
|
} else {
|
|
globalReplicationState.queueReplicaDeleteTask(dv)
|
|
}
|
|
}
|