reduce all major allocations in replication path (#18032)

- remove targetClient for passing around via replicationObjectInfo{}
- remove cloing to object info unnecessarily
- remove objectInfo from replicationObjectInfo{} (only require necessary fields)
This commit is contained in:
Harshavardhana 2023-09-16 02:28:06 -07:00 committed by GitHub
parent 9fab91852a
commit fa6d082bfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 373 additions and 238 deletions

View File

@ -49,18 +49,18 @@ test: verifiers build ## builds minio, runs linters, tests
@echo "Running unit tests"
@MINIO_API_REQUESTS_MAX=10000 CGO_ENABLED=0 go test -tags kqueue ./...
test-root-disable: install
test-root-disable: install-race
@echo "Running minio root lockdown tests"
@env bash $(PWD)/buildscripts/disable-root.sh
test-decom: install
test-decom: install-race
@echo "Running minio decom tests"
@env bash $(PWD)/docs/distributed/decom.sh
@env bash $(PWD)/docs/distributed/decom-encrypted.sh
@env bash $(PWD)/docs/distributed/decom-encrypted-sse-s3.sh
@env bash $(PWD)/docs/distributed/decom-compressed-sse-s3.sh
test-upgrade: build
test-upgrade: install-race
@echo "Running minio upgrade tests"
@(env bash $(PWD)/buildscripts/minio-upgrade.sh)
@ -86,18 +86,18 @@ test-replication-3site:
test-delete-replication:
@(env bash $(PWD)/docs/bucket/replication/delete-replication.sh)
test-replication: install test-replication-2site test-replication-3site test-delete-replication test-sio-error ## verify multi site replication
test-replication: install-race test-replication-2site test-replication-3site test-delete-replication test-sio-error ## verify multi site replication
@echo "Running tests for replicating three sites"
test-site-replication-ldap: install ## verify automatic site replication
test-site-replication-ldap: install-race ## verify automatic site replication
@echo "Running tests for automatic site replication of IAM (with LDAP)"
@(env bash $(PWD)/docs/site-replication/run-multi-site-ldap.sh)
test-site-replication-oidc: install ## verify automatic site replication
test-site-replication-oidc: install-race ## verify automatic site replication
@echo "Running tests for automatic site replication of IAM (with OIDC)"
@(env bash $(PWD)/docs/site-replication/run-multi-site-oidc.sh)
test-site-replication-minio: install ## verify automatic site replication
test-site-replication-minio: install-race ## verify automatic site replication
@echo "Running tests for automatic site replication of IAM (with MinIO IDP)"
@(env bash $(PWD)/docs/site-replication/run-multi-site-minio-idp.sh)
@ -159,6 +159,12 @@ docker: build ## builds minio docker container
@echo "Building minio docker image '$(TAG)'"
@docker build -q --no-cache -t $(TAG) . -f Dockerfile
install-race: checks ## builds minio to $(PWD)
@echo "Building minio binary to './minio'"
@GORACE=history_size=7 CGO_ENABLED=1 go build -tags kqueue -race -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null
@echo "Installing minio binary to '$(GOPATH)/bin/minio'"
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/minio $(GOPATH)/bin/minio
install: build ## builds minio and installs it to $GOPATH/bin.
@echo "Installing minio binary to '$(GOPATH)/bin/minio'"
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/minio $(GOPATH)/bin/minio

View File

@ -56,6 +56,8 @@ done
set +e
sleep 10
./mc ls minioadm/
if [ $? -ne 0 ]; then
echo "listing failed, 'minioadmin' should be enabled"

View File

@ -32,7 +32,6 @@ import (
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/bucket/replication"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
)
//go:generate msgp -file=$GOFILE
@ -167,7 +166,21 @@ func (ri replicatedInfos) Action() replicationAction {
var replStatusRegex = regexp.MustCompile(`([^=].*?)=([^,].*?);`)
// TargetReplicationStatus - returns replication status of a target
func (o *ObjectInfo) TargetReplicationStatus(arn string) (status replication.StatusType) {
func (ri ReplicateObjectInfo) TargetReplicationStatus(arn string) (status replication.StatusType) {
repStatMatches := replStatusRegex.FindAllStringSubmatch(ri.ReplicationStatusInternal, -1)
for _, repStatMatch := range repStatMatches {
if len(repStatMatch) != 3 {
return
}
if repStatMatch[1] == arn {
return replication.StatusType(repStatMatch[2])
}
}
return
}
// TargetReplicationStatus - returns replication status of a target
func (o ObjectInfo) TargetReplicationStatus(arn string) (status replication.StatusType) {
repStatMatches := replStatusRegex.FindAllStringSubmatch(o.ReplicationStatusInternal, -1)
for _, repStatMatch := range repStatMatches {
if len(repStatMatch) != 3 {
@ -185,7 +198,6 @@ type replicateTargetDecision struct {
Synchronous bool // Synchronous replication configured.
Arn string // ARN of replication target
ID string
Tgt *TargetClient
}
func (t *replicateTargetDecision) String() string {
@ -207,7 +219,7 @@ type ReplicateDecision struct {
}
// ReplicateAny returns true if atleast one target qualifies for replication
func (d *ReplicateDecision) ReplicateAny() bool {
func (d ReplicateDecision) ReplicateAny() bool {
for _, t := range d.targetsMap {
if t.Replicate {
return true
@ -217,7 +229,7 @@ func (d *ReplicateDecision) ReplicateAny() bool {
}
// Synchronous returns true if atleast one target qualifies for synchronous replication
func (d *ReplicateDecision) Synchronous() bool {
func (d ReplicateDecision) Synchronous() bool {
for _, t := range d.targetsMap {
if t.Synchronous {
return true
@ -226,7 +238,7 @@ func (d *ReplicateDecision) Synchronous() bool {
return false
}
func (d *ReplicateDecision) String() string {
func (d ReplicateDecision) String() string {
b := new(bytes.Buffer)
for key, value := range d.targetsMap {
fmt.Fprintf(b, "%s=%s,", key, value.String())
@ -243,7 +255,7 @@ func (d *ReplicateDecision) Set(t replicateTargetDecision) {
}
// PendingStatus returns a stringified representation of internal replication status with all targets marked as `PENDING`
func (d *ReplicateDecision) PendingStatus() string {
func (d ReplicateDecision) PendingStatus() string {
b := new(bytes.Buffer)
for _, k := range d.targetsMap {
if k.Replicate {
@ -259,11 +271,11 @@ type ResyncDecision struct {
}
// Empty returns true if no targets with resync decision present
func (r *ResyncDecision) Empty() bool {
func (r ResyncDecision) Empty() bool {
return r.targets == nil
}
func (r *ResyncDecision) mustResync() bool {
func (r ResyncDecision) mustResync() bool {
for _, v := range r.targets {
if v.Replicate {
return true
@ -272,15 +284,12 @@ func (r *ResyncDecision) mustResync() bool {
return false
}
func (r *ResyncDecision) mustResyncTarget(tgtArn string) bool {
func (r ResyncDecision) mustResyncTarget(tgtArn string) bool {
if r.targets == nil {
return false
}
v, ok := r.targets[tgtArn]
if ok && v.Replicate {
return true
}
return false
return ok && v.Replicate
}
// ResyncTargetDecision is struct that represents resync decision for this target
@ -301,35 +310,20 @@ func parseReplicateDecision(ctx context.Context, bucket, s string) (r ReplicateD
if len(s) == 0 {
return
}
pairs := strings.Split(s, ",")
for _, p := range pairs {
for _, p := range strings.Split(s, ",") {
if p == "" {
continue
}
slc := strings.Split(p, "=")
if len(slc) != 2 {
return r, errInvalidReplicateDecisionFormat
}
tgtStr := strings.TrimPrefix(slc[1], "\"")
tgtStr = strings.TrimSuffix(tgtStr, "\"")
tgtStr := strings.TrimSuffix(strings.TrimPrefix(slc[1], `"`), `"`)
tgt := strings.Split(tgtStr, ";")
if len(tgt) != 4 {
return r, errInvalidReplicateDecisionFormat
}
var replicate, sync bool
var err error
replicate, err = strconv.ParseBool(tgt[0])
if err != nil {
return r, err
}
sync, err = strconv.ParseBool(tgt[1])
if err != nil {
return r, err
}
tgtClnt := globalBucketTargetSys.GetRemoteTargetClient(slc[0])
if tgtClnt == nil {
// Skip stale targets if any and log them to be missing atleast once.
logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, slc[0]), slc[0])
// We save the targetDecision even when its not configured or stale.
}
r.targetsMap[slc[0]] = replicateTargetDecision{Replicate: replicate, Synchronous: sync, Arn: tgt[2], ID: tgt[3], Tgt: tgtClnt}
r.targetsMap[slc[0]] = replicateTargetDecision{Replicate: tgt[0] == "true", Synchronous: tgt[1] == "true", Arn: tgt[2], ID: tgt[3]}
}
return
}
@ -496,8 +490,8 @@ func getCompositeVersionPurgeStatus(m map[string]VersionPurgeStatusType) Version
}
// getHealReplicateObjectInfo returns info needed by heal replication in ReplicateObjectInfo
func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) ReplicateObjectInfo {
oi := objInfo.Clone()
func getHealReplicateObjectInfo(oi ObjectInfo, rcfg replicationConfig) ReplicateObjectInfo {
userDefined := cloneMSS(oi.UserDefined)
if rcfg.Config != nil && rcfg.Config.RoleArn != "" {
// For backward compatibility of objects pending/failed replication.
// Save replication related statuses in the new internal representation for
@ -508,17 +502,15 @@ func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) Repl
if !oi.VersionPurgeStatus.Empty() {
oi.VersionPurgeStatusInternal = fmt.Sprintf("%s=%s;", rcfg.Config.RoleArn, oi.VersionPurgeStatus)
}
for k, v := range oi.UserDefined {
for k, v := range userDefined {
if strings.EqualFold(k, ReservedMetadataPrefixLower+ReplicationReset) {
delete(oi.UserDefined, k)
oi.UserDefined[targetResetHeader(rcfg.Config.RoleArn)] = v
delete(userDefined, k)
userDefined[targetResetHeader(rcfg.Config.RoleArn)] = v
}
}
}
var dsc ReplicateDecision
var tgtStatuses map[string]replication.StatusType
var purgeStatuses map[string]VersionPurgeStatusType
var dsc ReplicateDecision
if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() {
dsc = checkReplicateDelete(GlobalContext, oi.Bucket, ObjectToDelete{
ObjectV: ObjectV{
@ -530,16 +522,31 @@ func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) Repl
VersionSuspended: globalBucketVersioningSys.PrefixSuspended(oi.Bucket, oi.Name),
}, nil)
} else {
dsc = mustReplicate(GlobalContext, oi.Bucket, oi.Name, getMustReplicateOptions(ObjectInfo{
UserDefined: oi.UserDefined,
}, replication.HealReplicationType, ObjectOptions{}))
dsc = mustReplicate(GlobalContext, oi.Bucket, oi.Name, getMustReplicateOptions(userDefined, oi.UserTags, "", replication.HealReplicationType, ObjectOptions{}))
}
tgtStatuses = replicationStatusesMap(oi.ReplicationStatusInternal)
purgeStatuses = versionPurgeStatusesMap(oi.VersionPurgeStatusInternal)
existingObjResync := rcfg.Resync(GlobalContext, oi, &dsc, tgtStatuses)
tm, _ := time.Parse(time.RFC3339Nano, oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp])
tgtStatuses := replicationStatusesMap(oi.ReplicationStatusInternal)
purgeStatuses := versionPurgeStatusesMap(oi.VersionPurgeStatusInternal)
existingObjResync := rcfg.Resync(GlobalContext, oi, dsc, tgtStatuses)
tm, _ := time.Parse(time.RFC3339Nano, userDefined[ReservedMetadataPrefixLower+ReplicationTimestamp])
rstate := oi.ReplicationState()
rstate.ReplicateDecisionStr = dsc.String()
asz, _ := oi.GetActualSize()
return ReplicateObjectInfo{
ObjectInfo: oi,
Name: oi.Name,
Size: oi.Size,
ActualSize: asz,
Bucket: oi.Bucket,
VersionID: oi.VersionID,
ModTime: oi.ModTime,
ReplicationStatus: oi.ReplicationStatus,
ReplicationStatusInternal: oi.ReplicationStatusInternal,
DeleteMarker: oi.DeleteMarker,
VersionPurgeStatusInternal: oi.VersionPurgeStatusInternal,
VersionPurgeStatus: oi.VersionPurgeStatus,
ReplicationState: rstate,
OpType: replication.HealReplicationType,
Dsc: dsc,
ExistingObjResync: existingObjResync,
@ -549,14 +556,8 @@ func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) Repl
}
}
func (ri *ReplicateObjectInfo) getReplicationState() ReplicationState {
rs := ri.ObjectInfo.getReplicationState()
rs.ReplicateDecisionStr = ri.Dsc.String()
return rs
}
// vID here represents the versionID client specified in request - need to distinguish between delete marker and delete marker deletion
func (o *ObjectInfo) getReplicationState() ReplicationState {
// ReplicationState - returns replication state using other internal replication metadata in ObjectInfo
func (o ObjectInfo) ReplicationState() ReplicationState {
rs := ReplicationState{
ReplicationStatusInternal: o.ReplicationStatusInternal,
VersionPurgeStatusInternal: o.VersionPurgeStatusInternal,
@ -577,7 +578,7 @@ func (o *ObjectInfo) getReplicationState() ReplicationState {
}
// ReplicationState returns replication state using other internal replication metadata in ObjectToDelete
func (o *ObjectToDelete) ReplicationState() ReplicationState {
func (o ObjectToDelete) ReplicationState() ReplicationState {
r := ReplicationState{
ReplicationStatusInternal: o.DeleteMarkerReplicationStatus,
VersionPurgeStatusInternal: o.VersionPurgeStatuses,

View File

@ -224,21 +224,19 @@ func (o mustReplicateOptions) isMetadataReplication() bool {
return o.opType == replication.MetadataReplicationType
}
func getMustReplicateOptions(o ObjectInfo, op replication.Type, opts ObjectOptions) mustReplicateOptions {
if !op.Valid() {
op = replication.ObjectReplicationType
if o.metadataOnly {
op = replication.MetadataReplicationType
}
}
meta := cloneMSS(o.UserDefined)
if o.UserTags != "" {
meta[xhttp.AmzObjectTagging] = o.UserTags
func (o ObjectInfo) getMustReplicateOptions(op replication.Type, opts ObjectOptions) mustReplicateOptions {
return getMustReplicateOptions(o.UserDefined, o.UserTags, o.ReplicationStatus, op, opts)
}
func getMustReplicateOptions(userDefined map[string]string, userTags string, status replication.StatusType, op replication.Type, opts ObjectOptions) mustReplicateOptions {
meta := cloneMSS(userDefined)
if userTags != "" {
meta[xhttp.AmzObjectTagging] = userTags
}
return mustReplicateOptions{
meta: meta,
status: o.ReplicationStatus,
status: status,
opType: op,
replicationRequest: opts.ReplicationRequest,
}
@ -356,40 +354,41 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
OpType: replication.DeleteReplicationType,
}
tgtArns := rcfg.FilterTargetArns(opts)
if len(tgtArns) > 0 {
dsc.targetsMap = make(map[string]replicateTargetDecision, len(tgtArns))
var sync, replicate bool
for _, tgtArn := range tgtArns {
opts.TargetArn = tgtArn
replicate = rcfg.Replicate(opts)
// when incoming delete is removal of a delete marker(a.k.a versioned delete),
// GetObjectInfo returns extra information even though it returns errFileNotFound
if gerr != nil {
validReplStatus := false
switch oi.TargetReplicationStatus(tgtArn) {
case replication.Pending, replication.Completed, replication.Failed:
validReplStatus = true
}
if oi.DeleteMarker && (validReplStatus || replicate) {
dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync))
continue
} else {
// can be the case that other cluster is down and duplicate `mc rm --vid`
// is issued - this still needs to be replicated back to the other target
replicate = oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed
dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync))
continue
}
dsc.targetsMap = make(map[string]replicateTargetDecision, len(tgtArns))
if len(tgtArns) == 0 {
return dsc
}
var sync, replicate bool
for _, tgtArn := range tgtArns {
opts.TargetArn = tgtArn
replicate = rcfg.Replicate(opts)
// when incoming delete is removal of a delete marker(a.k.a versioned delete),
// GetObjectInfo returns extra information even though it returns errFileNotFound
if gerr != nil {
validReplStatus := false
switch oi.TargetReplicationStatus(tgtArn) {
case replication.Pending, replication.Completed, replication.Failed:
validReplStatus = true
}
tgt := globalBucketTargetSys.GetRemoteTargetClient(tgtArn)
// the target online status should not be used here while deciding
// whether to replicate deletes as the target could be temporarily down
tgtDsc := newReplicateTargetDecision(tgtArn, false, false)
if tgt != nil {
tgtDsc = newReplicateTargetDecision(tgtArn, replicate, tgt.replicateSync)
if oi.DeleteMarker && (validReplStatus || replicate) {
dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync))
continue
} else {
// can be the case that other cluster is down and duplicate `mc rm --vid`
// is issued - this still needs to be replicated back to the other target
replicate = oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed
dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync))
continue
}
dsc.Set(tgtDsc)
}
tgt := globalBucketTargetSys.GetRemoteTargetClient(tgtArn)
// the target online status should not be used here while deciding
// whether to replicate deletes as the target could be temporarily down
tgtDsc := newReplicateTargetDecision(tgtArn, false, false)
if tgt != nil {
tgtDsc = newReplicateTargetDecision(tgtArn, replicate, tgt.replicateSync)
}
dsc.Set(tgtDsc)
}
return dsc
}
@ -483,15 +482,10 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
ctx = lkctx.Context()
defer lk.Unlock(lkctx)
rinfos := replicatedInfos{Targets: make([]replicatedTargetInfo, 0, len(dsc.targetsMap))}
var wg sync.WaitGroup
var rinfos replicatedInfos
rinfos.Targets = make([]replicatedTargetInfo, len(dsc.targetsMap))
idx := -1
var mu sync.Mutex
for _, tgtEntry := range dsc.targetsMap {
idx++
if tgtEntry.Tgt == nil {
continue
}
if !tgtEntry.Replicate {
continue
}
@ -499,11 +493,33 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
if dobj.TargetArn != "" && dobj.TargetArn != tgtEntry.Arn {
continue
}
tgtClnt := globalBucketTargetSys.GetRemoteTargetClient(tgtEntry.Arn)
if tgtClnt == nil {
// Skip stale targets if any and log them to be missing atleast once.
logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtEntry.Arn), tgtEntry.Arn)
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: ObjectInfo{
Bucket: bucket,
Name: dobj.ObjectName,
VersionID: versionID,
DeleteMarker: dobj.DeleteMarker,
},
UserAgent: "Internal: [Replication]",
Host: globalLocalNodeName,
})
continue
}
wg.Add(1)
go func(index int, tgt *TargetClient) {
go func(tgt *TargetClient) {
defer wg.Done()
rinfos.Targets[index] = replicateDeleteToTarget(ctx, dobj, tgt)
}(idx, tgtEntry.Tgt)
tgtInfo := replicateDeleteToTarget(ctx, dobj, tgt)
mu.Lock()
rinfos.Targets = append(rinfos.Targets, tgtInfo)
mu.Unlock()
}(tgtClnt)
}
wg.Wait()
@ -963,9 +979,8 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
})
}()
objInfo := ri.ObjectInfo
bucket := objInfo.Bucket
object := objInfo.Name
bucket := ri.Bucket
object := ri.Name
cfg, err := getReplicationConfig(ctx, bucket)
if err != nil {
@ -973,7 +988,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Object: ri.ToObjectInfo(),
UserAgent: "Internal: [Replication]",
Host: globalLocalNodeName,
})
@ -981,8 +996,8 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
}
tgtArns := cfg.FilterTargetArns(replication.ObjectOpts{
Name: object,
SSEC: crypto.SSEC.IsEncrypted(objInfo.UserDefined),
UserTags: objInfo.UserTags,
SSEC: ri.SSEC,
UserTags: ri.UserTags,
})
// Lock the object name before starting replication.
// Use separate lock that doesn't collide with regular objects.
@ -992,7 +1007,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Object: ri.ToObjectInfo(),
UserAgent: "Internal: [Replication]",
Host: globalLocalNodeName,
})
@ -1002,32 +1017,38 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
ctx = lkctx.Context()
defer lk.Unlock(lkctx)
rinfos := replicatedInfos{Targets: make([]replicatedTargetInfo, 0, len(tgtArns))}
var wg sync.WaitGroup
var rinfos replicatedInfos
rinfos.Targets = make([]replicatedTargetInfo, len(tgtArns))
for i, tgtArn := range tgtArns {
var mu sync.Mutex
for _, tgtArn := range tgtArns {
tgt := globalBucketTargetSys.GetRemoteTargetClient(tgtArn)
if tgt == nil {
logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn), tgtArn)
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Object: ri.ToObjectInfo(),
UserAgent: "Internal: [Replication]",
Host: globalLocalNodeName,
})
continue
}
wg.Add(1)
go func(index int, tgt *TargetClient) {
go func(tgt *TargetClient) {
defer wg.Done()
var tgtInfo replicatedTargetInfo
if ri.OpType == replication.ObjectReplicationType {
// all incoming calls go through optimized path.
rinfos.Targets[index] = ri.replicateObject(ctx, objectAPI, tgt)
tgtInfo = ri.replicateObject(ctx, objectAPI, tgt)
} else {
rinfos.Targets[index] = ri.replicateAll(ctx, objectAPI, tgt)
tgtInfo = ri.replicateAll(ctx, objectAPI, tgt)
}
}(i, tgt)
mu.Lock()
rinfos.Targets = append(rinfos.Targets, tgtInfo)
mu.Unlock()
}(tgt)
}
wg.Wait()
@ -1042,10 +1063,11 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
newReplStatusInternal := rinfos.ReplicationStatusInternal()
// Note that internal replication status(es) may match for previously replicated objects - in such cases
// metadata should be updated with last resync timestamp.
if objInfo.ReplicationStatusInternal != newReplStatusInternal || rinfos.ReplicationResynced() {
objInfo := ri.ToObjectInfo()
if ri.ReplicationStatusInternal != newReplStatusInternal || rinfos.ReplicationResynced() {
popts := ObjectOptions{
MTime: objInfo.ModTime,
VersionID: objInfo.VersionID,
MTime: ri.ModTime,
VersionID: ri.VersionID,
EvalMetadataFn: func(oi *ObjectInfo, gerr error) (dsc ReplicateDecision, err error) {
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = newReplStatusInternal
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
@ -1055,14 +1077,18 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
oi.UserDefined[targetResetHeader(rinfo.Arn)] = rinfo.ResyncTimestamp
}
}
if objInfo.UserTags != "" {
oi.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
if ri.UserTags != "" {
oi.UserDefined[xhttp.AmzObjectTagging] = ri.UserTags
}
return dsc, nil
},
}
_, _ = objectAPI.PutObjectMetadata(ctx, bucket, object, popts)
uobjInfo, _ := objectAPI.PutObjectMetadata(ctx, bucket, object, popts)
if uobjInfo.Name != "" {
objInfo = uobjInfo
}
opType := replication.MetadataReplicationType
if rinfos.Action() == replicateAll {
opType = replication.ObjectReplicationType
@ -1098,23 +1124,21 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
// The source object is then updated to reflect the replication status.
func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
startTime := time.Now()
objInfo := ri.ObjectInfo.Clone()
bucket := objInfo.Bucket
object := objInfo.Name
sz, _ := objInfo.GetActualSize()
bucket := ri.Bucket
object := ri.Name
rAction := replicateAll
rinfo = replicatedTargetInfo{
Size: sz,
Size: ri.ActualSize,
Arn: tgt.ARN,
PrevReplicationStatus: objInfo.TargetReplicationStatus(tgt.ARN),
PrevReplicationStatus: ri.TargetReplicationStatus(tgt.ARN),
ReplicationStatus: replication.Failed,
OpType: ri.OpType,
ReplicationAction: rAction,
endpoint: tgt.EndpointURL().Host,
secure: tgt.EndpointURL().Scheme == "https",
}
if ri.ObjectInfo.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) {
if ri.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) {
rinfo.ReplicationStatus = replication.Completed
rinfo.ReplicationResynced = true
return
@ -1125,7 +1149,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Object: ri.ToObjectInfo(),
UserAgent: "Internal: [Replication]",
Host: globalLocalNodeName,
})
@ -1136,12 +1160,13 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, ObjectOptions{
VersionID: objInfo.VersionID,
VersionID: ri.VersionID,
Versioned: versioned,
VersionSuspended: versionSuspended,
})
if err != nil {
if !isErrVersionNotFound(err) && !isErrObjectNotFound(err) {
objInfo := ri.ToObjectInfo()
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
@ -1155,7 +1180,8 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
}
defer gr.Close()
objInfo = gr.ObjInfo
objInfo := gr.ObjInfo
// make sure we have the latest metadata for metrics calculation
rinfo.PrevReplicationStatus = objInfo.TargetReplicationStatus(tgt.ARN)
@ -1217,7 +1243,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
opts := &bandwidth.MonitorReaderOptions{
BucketOptions: bandwidth.BucketOptions{
Name: objInfo.Bucket,
Name: ri.Bucket,
ReplicationARN: tgt.ARN,
},
HeaderSize: headerSize,
@ -1256,10 +1282,8 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
// The source object is then updated to reflect the replication status.
func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
startTime := time.Now()
objInfo := ri.ObjectInfo.Clone()
bucket := objInfo.Bucket
object := objInfo.Name
sz, _ := objInfo.GetActualSize()
bucket := ri.Bucket
object := ri.Name
// set defaults for replication action based on operation being performed - actual
// replication action can only be determined after stat on remote. This default is
@ -1267,9 +1291,9 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
rAction := replicateMetadata
rinfo = replicatedTargetInfo{
Size: sz,
Size: ri.ActualSize,
Arn: tgt.ARN,
PrevReplicationStatus: objInfo.TargetReplicationStatus(tgt.ARN),
PrevReplicationStatus: ri.TargetReplicationStatus(tgt.ARN),
ReplicationStatus: replication.Failed,
OpType: ri.OpType,
ReplicationAction: rAction,
@ -1282,7 +1306,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Object: ri.ToObjectInfo(),
UserAgent: "Internal: [Replication]",
Host: globalLocalNodeName,
})
@ -1293,12 +1317,13 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, ObjectOptions{
VersionID: objInfo.VersionID,
VersionID: ri.VersionID,
Versioned: versioned,
VersionSuspended: versionSuspended,
})
if err != nil {
if !isErrVersionNotFound(err) && !isErrObjectNotFound(err) {
objInfo := ri.ToObjectInfo()
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
@ -1312,7 +1337,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
}
defer gr.Close()
objInfo = gr.ObjInfo
objInfo := gr.ObjInfo
// make sure we have the latest metadata for metrics calculation
rinfo.PrevReplicationStatus = objInfo.TargetReplicationStatus(tgt.ARN)
@ -1379,7 +1404,9 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
}
// object with same VersionID already exists, replication kicked off by
// PutObject might have completed
if objInfo.TargetReplicationStatus(tgt.ARN) == replication.Pending || objInfo.TargetReplicationStatus(tgt.ARN) == replication.Failed || ri.OpType == replication.ExistingObjectReplicationType {
if objInfo.TargetReplicationStatus(tgt.ARN) == replication.Pending ||
objInfo.TargetReplicationStatus(tgt.ARN) == replication.Failed ||
ri.OpType == replication.ExistingObjectReplicationType {
// if metadata is not updated for some reason after replication, such as
// 503 encountered while updating metadata - make sure to set ReplicationStatus
// as Completed.
@ -2233,8 +2260,35 @@ func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, rs
return oi, proxy
}
func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, dsc ReplicateDecision, opType replication.Type) {
ri := ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType, Dsc: dsc, EventType: ReplicateIncoming}
func scheduleReplication(ctx context.Context, oi ObjectInfo, o ObjectLayer, dsc ReplicateDecision, opType replication.Type) {
tgtStatuses := replicationStatusesMap(oi.ReplicationStatusInternal)
purgeStatuses := versionPurgeStatusesMap(oi.VersionPurgeStatusInternal)
tm, _ := time.Parse(time.RFC3339Nano, oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp])
rstate := oi.ReplicationState()
rstate.ReplicateDecisionStr = dsc.String()
asz, _ := oi.GetActualSize()
ri := ReplicateObjectInfo{
Name: oi.Name,
Size: oi.Size,
ActualSize: asz,
Bucket: oi.Bucket,
VersionID: oi.VersionID,
ModTime: oi.ModTime,
ReplicationStatus: oi.ReplicationStatus,
ReplicationStatusInternal: oi.ReplicationStatusInternal,
DeleteMarker: oi.DeleteMarker,
VersionPurgeStatusInternal: oi.VersionPurgeStatusInternal,
VersionPurgeStatus: oi.VersionPurgeStatus,
ReplicationState: rstate,
OpType: opType,
Dsc: dsc,
TargetStatuses: tgtStatuses,
TargetPurgeStatuses: purgeStatuses,
ReplicationTimestamp: tm,
}
if dsc.Synchronous() {
replicateObject(ctx, ri, o)
} else {
@ -2263,7 +2317,7 @@ func (c replicationConfig) Replicate(opts replication.ObjectOpts) bool {
}
// Resync returns true if replication reset is requested
func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc *ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) {
func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) {
if c.Empty() {
return
}
@ -2272,8 +2326,6 @@ func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc *Repli
if oi.DeleteMarker {
opts := replication.ObjectOpts{
Name: oi.Name,
SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined),
UserTags: oi.UserTags,
DeleteMarker: oi.DeleteMarker,
VersionID: oi.VersionID,
OpType: replication.DeleteReplicationType,
@ -2294,23 +2346,19 @@ func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc *Repli
}
// Ignore previous replication status when deciding if object can be re-replicated
objInfo := oi.Clone()
objInfo.ReplicationStatusInternal = ""
objInfo.VersionPurgeStatusInternal = ""
objInfo.ReplicationStatus = ""
objInfo.VersionPurgeStatus = ""
delete(objInfo.UserDefined, xhttp.AmzBucketReplicationStatus)
resyncdsc := mustReplicate(ctx, oi.Bucket, oi.Name, getMustReplicateOptions(objInfo, replication.ExistingObjectReplicationType, ObjectOptions{}))
dsc = &resyncdsc
return c.resync(oi, dsc, tgtStatuses)
userDefined := cloneMSS(oi.UserDefined)
delete(userDefined, xhttp.AmzBucketReplicationStatus)
rdsc := mustReplicate(ctx, oi.Bucket, oi.Name, getMustReplicateOptions(userDefined, oi.UserTags, "", replication.ExistingObjectReplicationType, ObjectOptions{}))
return c.resync(oi, rdsc, tgtStatuses)
}
// wrapper function for testability. Returns true if a new reset is requested on
// already replicated objects OR object qualifies for existing object replication
// and no reset requested.
func (c replicationConfig) resync(oi ObjectInfo, dsc *ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) {
func (c replicationConfig) resync(oi ObjectInfo, dsc ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) {
r = ResyncDecision{
targets: make(map[string]ResyncTargetDecision),
targets: make(map[string]ResyncTargetDecision, len(dsc.targetsMap)),
}
if c.remotes == nil {
return
@ -2567,7 +2615,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
ObjectName: roi.Name,
DeleteMarkerVersionID: dmVersionID,
VersionID: versionID,
ReplicationState: roi.getReplicationState(),
ReplicationState: roi.ReplicationState,
DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
DeleteMarker: roi.DeleteMarker,
},
@ -3013,7 +3061,7 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf
ObjectName: roi.Name,
DeleteMarkerVersionID: dmVersionID,
VersionID: versionID,
ReplicationState: roi.getReplicationState(),
ReplicationState: roi.ReplicationState,
DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
DeleteMarker: roi.DeleteMarker,
},

View File

@ -88,7 +88,7 @@ var replicationConfigTests = []struct {
func TestReplicationResync(t *testing.T) {
ctx := context.Background()
for i, test := range replicationConfigTests {
if sync := test.rcfg.Resync(ctx, test.info, &test.dsc, test.tgtStatuses); sync.mustResync() != test.expectedSync {
if sync := test.rcfg.Resync(ctx, test.info, test.dsc, test.tgtStatuses); sync.mustResync() != test.expectedSync {
t.Errorf("Test%d (%s): Resync got %t , want %t", i+1, test.name, sync.mustResync(), test.expectedSync)
}
}
@ -283,7 +283,7 @@ var (
func TestReplicationResyncwrapper(t *testing.T) {
for i, test := range replicationConfigTests2 {
if sync := test.rcfg.resync(test.info, &test.dsc, test.tgtStatuses); sync.mustResync() != test.expectedSync {
if sync := test.rcfg.resync(test.info, test.dsc, test.tgtStatuses); sync.mustResync() != test.expectedSync {
t.Errorf("%s (%s): Replicationresync got %t , want %t", fmt.Sprintf("Test%d - %s", i+1, time.Now().Format(http.TimeFormat)), test.name, sync.mustResync(), test.expectedSync)
}
}

View File

@ -728,7 +728,7 @@ func (d *DecryptBlocksReader) Read(p []byte) (int, error) {
// DecryptedSize returns the size of the object after decryption in bytes.
// It returns an error if the object is not encrypted or marked as encrypted
// but has an invalid size.
func (o *ObjectInfo) DecryptedSize() (int64, error) {
func (o ObjectInfo) DecryptedSize() (int64, error) {
if _, ok := crypto.IsEncrypted(o.UserDefined); !ok {
return 0, errors.New("Cannot compute decrypted size of an unencrypted object")
}

View File

@ -68,6 +68,34 @@ type PoolDecommissionInfo struct {
BytesFailed int64 `json:"bytesDecommissionedFailed" msg:"bf"`
}
// Clone make a copy of PoolDecommissionInfo
func (pd *PoolDecommissionInfo) Clone() *PoolDecommissionInfo {
if pd == nil {
return nil
}
if pd.StartTime.IsZero() {
return nil
}
return &PoolDecommissionInfo{
StartTime: pd.StartTime,
StartSize: pd.StartSize,
TotalSize: pd.TotalSize,
CurrentSize: pd.CurrentSize,
Complete: pd.Complete,
Failed: pd.Failed,
Canceled: pd.Canceled,
QueuedBuckets: pd.QueuedBuckets,
DecommissionedBuckets: pd.DecommissionedBuckets,
Bucket: pd.Bucket,
Prefix: pd.Prefix,
Object: pd.Object,
ItemsDecommissioned: pd.ItemsDecommissioned,
ItemsDecommissionFailed: pd.ItemsDecommissionFailed,
BytesDone: pd.BytesDone,
BytesFailed: pd.BytesFailed,
}
}
// bucketPop should be called when a bucket is done decommissioning.
// Adds the bucket to the list of decommissioned buckets and updates resume numbers.
func (pd *PoolDecommissionInfo) bucketPop(bucket string) {
@ -118,6 +146,16 @@ type PoolStatus struct {
Decommission *PoolDecommissionInfo `json:"decommissionInfo,omitempty" msg:"dec"`
}
// Clone returns a copy of PoolStatus
func (ps PoolStatus) Clone() PoolStatus {
return PoolStatus{
ID: ps.ID,
CmdLine: ps.CmdLine,
LastUpdate: ps.LastUpdate,
Decommission: ps.Decommission.Clone(),
}
}
//go:generate msgp -file $GOFILE -unexported
type poolMeta struct {
Version int `msg:"v"`
@ -375,16 +413,17 @@ func (p *poolMeta) load(ctx context.Context, pool *erasureSets, pools []*erasure
func (p *poolMeta) CountItem(idx int, size int64, failed bool) {
pd := p.Pools[idx].Decommission
if pd != nil {
if failed {
pd.ItemsDecommissionFailed++
pd.BytesFailed += size
} else {
pd.ItemsDecommissioned++
pd.BytesDone += size
}
p.Pools[idx].Decommission = pd
if pd == nil {
return
}
if failed {
pd.ItemsDecommissionFailed++
pd.BytesFailed += size
} else {
pd.ItemsDecommissioned++
pd.BytesDone += size
}
p.Pools[idx].Decommission = pd
}
func (p *poolMeta) updateAfter(ctx context.Context, idx int, pools []*erasureSets, duration time.Duration) (bool, error) {
@ -1185,15 +1224,15 @@ func (z *erasureServerPools) Status(ctx context.Context, idx int) (PoolStatus, e
return PoolStatus{}, errInvalidArgument
}
z.poolMetaMutex.RLock()
defer z.poolMetaMutex.RUnlock()
pi, err := z.getDecommissionPoolSpaceInfo(idx)
if err != nil {
return PoolStatus{}, err
}
poolInfo := z.poolMeta.Pools[idx]
z.poolMetaMutex.RLock()
defer z.poolMetaMutex.RUnlock()
poolInfo := z.poolMeta.Pools[idx].Clone()
if poolInfo.Decommission != nil {
poolInfo.Decommission.TotalSize = pi.Total
if poolInfo.Decommission.Failed || poolInfo.Decommission.Canceled {

View File

@ -109,6 +109,9 @@ type ObjectInfo struct {
// Total object size.
Size int64
// Actual size is the real size of the object uploaded by client.
ActualSize *int64
// IsDir indicates if the object is prefix.
IsDir bool
@ -282,9 +285,44 @@ func (o ObjectInfo) tierStats() tierStats {
return ts
}
// ToObjectInfo converts a replication object info to a partial ObjectInfo
// do not rely on this function to give you correct ObjectInfo, this
// function is merely and optimization.
func (ri ReplicateObjectInfo) ToObjectInfo() ObjectInfo {
return ObjectInfo{
Name: ri.Name,
Bucket: ri.Bucket,
VersionID: ri.VersionID,
ModTime: ri.ModTime,
UserTags: ri.UserTags,
Size: ri.Size,
ActualSize: &ri.ActualSize,
ReplicationStatus: ri.ReplicationStatus,
ReplicationStatusInternal: ri.ReplicationStatusInternal,
VersionPurgeStatus: ri.VersionPurgeStatus,
VersionPurgeStatusInternal: ri.VersionPurgeStatusInternal,
DeleteMarker: true,
UserDefined: map[string]string{},
}
}
// ReplicateObjectInfo represents object info to be replicated
type ReplicateObjectInfo struct {
ObjectInfo
Name string
Bucket string
VersionID string
Size int64
ActualSize int64
ModTime time.Time
UserTags string
SSEC bool
ReplicationStatus replication.StatusType
ReplicationStatusInternal string
VersionPurgeStatusInternal string
VersionPurgeStatus VersionPurgeStatusType
ReplicationState ReplicationState
DeleteMarker bool
OpType replication.Type
EventType string
RetryCount uint32
@ -529,7 +567,7 @@ type PartInfo struct {
// Size in bytes of the part.
Size int64
// Decompressed Size.
// Real size of the object uploaded by client.
ActualSize int64
// Checksum values

View File

@ -507,7 +507,10 @@ func (o *ObjectInfo) IsCompressedOK() (bool, error) {
}
// GetActualSize - returns the actual size of the stored object
func (o *ObjectInfo) GetActualSize() (int64, error) {
func (o ObjectInfo) GetActualSize() (int64, error) {
if o.ActualSize != nil {
return *o.ActualSize, nil
}
if o.IsCompressed() {
sizeStr, ok := o.UserDefined[ReservedMetadataPrefix+"actual-size"]
if !ok {

View File

@ -1427,7 +1427,12 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
srcInfo.UserDefined[ReservedMetadataPrefixLower+ReplicaTimestamp] = UTCNow().Format(time.RFC3339Nano)
srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = rs
}
if dsc := mustReplicate(ctx, dstBucket, dstObject, getMustReplicateOptions(srcInfo, replication.UnsetReplicationType, dstOpts)); dsc.ReplicateAny() {
op := replication.ObjectReplicationType
if srcInfo.metadataOnly {
op = replication.MetadataReplicationType
}
if dsc := mustReplicate(ctx, dstBucket, dstObject, srcInfo.getMustReplicateOptions(op, dstOpts)); dsc.ReplicateAny() {
srcInfo.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
srcInfo.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
}
@ -1500,6 +1505,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL)
return
}
objInfo.UserDefined = cloneMSS(opts.UserMetadata)
objInfo.ETag = remoteObjInfo.ETag
objInfo.ModTime = remoteObjInfo.LastModified
} else {
@ -1533,8 +1539,8 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime)
encodedSuccessResponse := encodeResponse(response)
if dsc := mustReplicate(ctx, dstBucket, dstObject, getMustReplicateOptions(objInfo, replication.UnsetReplicationType, dstOpts)); dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType)
if dsc := mustReplicate(ctx, dstBucket, dstObject, objInfo.getMustReplicateOptions(replication.ObjectReplicationType, dstOpts)); dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType)
}
setPutObjHeaders(w, objInfo, false)
@ -1815,9 +1821,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{
UserDefined: metadata,
}, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
metadata[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
}
@ -1923,10 +1927,8 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}
}
}
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{
UserDefined: metadata,
}, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType)
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType)
}
setPutObjHeaders(w, objInfo, false)
@ -2182,9 +2184,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
return ObjectLocked{}
}
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{
UserDefined: metadata,
}, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
metadata[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
@ -2235,10 +2235,8 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
return err
}
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{
UserDefined: metadata,
}, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType)
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType)
}
// Notify object created event.
@ -2453,7 +2451,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
DeleteMarkerVersionID: dmVersionID,
DeleteMarkerMTime: DeleteMarkerMTime{objInfo.ModTime},
DeleteMarker: objInfo.DeleteMarker,
ReplicationState: objInfo.getReplicationState(),
ReplicationState: objInfo.ReplicationState(),
},
Bucket: bucket,
EventType: ReplicateIncomingDelete,
@ -2528,7 +2526,7 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r
oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status))
oi.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp] = UTCNow().Format(time.RFC3339Nano)
dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(*oi, replication.MetadataReplicationType, opts))
dsc := mustReplicate(ctx, bucket, object, oi.getMustReplicateOptions(replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
@ -2543,9 +2541,9 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r
return
}
dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.MetadataReplicationType, opts))
dsc := mustReplicate(ctx, bucket, object, objInfo.getMustReplicateOptions(replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.MetadataReplicationType)
scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.MetadataReplicationType)
}
writeSuccessResponseHeadersOnly(w)
@ -2697,7 +2695,7 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r
oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = ""
}
oi.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = UTCNow().Format(time.RFC3339Nano)
dsc = mustReplicate(ctx, bucket, object, getMustReplicateOptions(*oi, replication.MetadataReplicationType, opts))
dsc = mustReplicate(ctx, bucket, object, oi.getMustReplicateOptions(replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
@ -2712,9 +2710,9 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r
return
}
dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.MetadataReplicationType, opts))
dsc := mustReplicate(ctx, bucket, object, objInfo.getMustReplicateOptions(replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.MetadataReplicationType)
scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.MetadataReplicationType)
}
writeSuccessResponseHeadersOnly(w)
@ -2923,9 +2921,7 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h
}
tagsStr := tags.String()
oi := objInfo.Clone()
oi.UserTags = tagsStr
dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(oi, replication.MetadataReplicationType, opts))
dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo.UserDefined, tagsStr, objInfo.ReplicationStatus, replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
opts.UserDefined = make(map[string]string)
opts.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
@ -2941,7 +2937,7 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h
}
if dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo.Clone(), objAPI, dsc, replication.MetadataReplicationType)
scheduleReplication(ctx, objInfo, objAPI, dsc, replication.MetadataReplicationType)
}
if objInfo.VersionID != "" && objInfo.VersionID != nullVersionID {
@ -3003,7 +2999,7 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r
return
}
dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(oi, replication.MetadataReplicationType, opts))
dsc := mustReplicate(ctx, bucket, object, oi.getMustReplicateOptions(replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
opts.UserDefined = make(map[string]string)
opts.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
@ -3017,7 +3013,7 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r
}
if dsc.ReplicateAny() {
scheduleReplication(ctx, oi.Clone(), objAPI, dsc, replication.MetadataReplicationType)
scheduleReplication(ctx, oi, objAPI, dsc, replication.MetadataReplicationType)
}
if oi.VersionID != "" && oi.VersionID != nullVersionID {

View File

@ -164,9 +164,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{
UserDefined: metadata,
}, replication.ObjectReplicationType, ObjectOptions{})); dsc.ReplicateAny() {
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, ObjectOptions{})); dsc.ReplicateAny() {
metadata[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
}
@ -997,8 +995,8 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
}
setPutObjHeaders(w, objInfo, false)
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType)
if dsc := mustReplicate(ctx, bucket, object, objInfo.getMustReplicateOptions(replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType)
}
if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok {
actualSize, _ := objInfo.GetActualSize()

View File

@ -18,7 +18,7 @@ export CI=true
(minio server /tmp/xl/{1...10}/disk{0...1} 2>&1 >/tmp/decom.log) &
pid=$!
sleep 2
sleep 10
export MC_HOST_myminio="http://minioadmin:minioadmin@localhost:9000/"
@ -48,26 +48,30 @@ policy_count=$(./mc admin policy list myminio/ | wc -l)
## create a warm tier instance
(minio server /tmp/xltier/{1...4}/disk{0...1} --address :9001 2>&1 >/dev/null) &
sleep 2
sleep 10
export MC_HOST_mytier="http://minioadmin:minioadmin@localhost:9001/"
./mc mb -l myminio/bucket2
./mc mb -l mytier/tiered
## create a tier and set up ilm policy to tier immediately
./mc admin tier add minio myminio TIER1 --endpoint http://localhost:9001 --access-key minioadmin --secret-key minioadmin --bucket tiered --prefix prefix5/
./mc ilm add myminio/bucket2 --transition-days 0 --transition-tier TIER1 --transition-days 0
## mirror some content to bucket2 and capture versions tiered
./mc mirror internal myminio/bucket2/ --quiet >/dev/null
./mc ls -r myminio/bucket2/ >bucket2_ns.txt
./mc ls -r --versions myminio/bucket2/ >bucket2_ns_versions.txt
sleep 2
sleep 10
./mc ls -r --versions mytier/tiered/ >tiered_ns_versions.txt
kill $pid
(minio server /tmp/xl/{1...10}/disk{0...1} /tmp/xl/{11...30}/disk{0...3} 2>&1 >/tmp/expanded.log) &
pid=$!
sleep 2
sleep 10
expanded_user_count=$(./mc admin user list myminio/ | wc -l)
expanded_policy_count=$(./mc admin policy list myminio/ | wc -l)
@ -106,7 +110,7 @@ kill $pid
(minio server /tmp/xl/{11...30}/disk{0...3} 2>&1 >/dev/null) &
pid=$!
sleep 2
sleep 10
decom_user_count=$(./mc admin user list myminio/ | wc -l)
decom_policy_count=$(./mc admin policy list myminio/ | wc -l)