Add support for existing object replication. (#12109)

Also adding an API to allow resyncing replication when
existing object replication is enabled and the remote target
is entirely lost. With the `mc replicate reset` command, the
objects that are eligible for replication as per the replication
config will be resynced to target if existing object replication
is enabled on the rule.
This commit is contained in:
Poorna Krishnamoorthy
2021-06-01 19:59:11 -07:00
committed by GitHub
parent 1f262daf6f
commit dbea8d2ee0
19 changed files with 928 additions and 334 deletions

View File

@@ -91,32 +91,70 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re
return false, BucketRemoteTargetNotFound{Bucket: bucket}
}
func mustReplicateWeb(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string, permErr APIErrorCode) (replicate bool, sync bool) {
type mustReplicateOptions struct {
meta map[string]string
status replication.StatusType
opType replication.Type
}
func (o mustReplicateOptions) ReplicationStatus() (s replication.StatusType) {
if rs, ok := o.meta[xhttp.AmzBucketReplicationStatus]; ok {
return replication.StatusType(rs)
}
return s
}
func (o mustReplicateOptions) isExistingObjectReplication() bool {
return o.opType == replication.ExistingObjectReplicationType
}
func (o mustReplicateOptions) isMetadataReplication() bool {
return o.opType == replication.MetadataReplicationType
}
func getMustReplicateOptions(o ObjectInfo, op replication.Type) mustReplicateOptions {
if !op.Valid() {
op = replication.ObjectReplicationType
if o.metadataOnly {
op = replication.MetadataReplicationType
}
}
meta := cloneMSS(o.UserDefined)
if o.UserTags != "" {
meta[xhttp.AmzObjectTagging] = o.UserTags
}
return mustReplicateOptions{
meta: meta,
status: o.ReplicationStatus,
opType: op,
}
}
func mustReplicateWeb(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus replication.StatusType, permErr APIErrorCode) (replicate bool, sync bool) {
if permErr != ErrNone {
return
}
return mustReplicater(ctx, bucket, object, meta, replStatus, false)
return mustReplicater(ctx, bucket, object, mustReplicateOptions{
meta: meta,
status: replStatus,
opType: replication.ObjectReplicationType,
})
}
// mustReplicate returns 2 booleans - true if object meets replication criteria and true if replication is to be done in
// a synchronous manner.
func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string, metadataOnly bool) (replicate bool, sync bool) {
func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, opts mustReplicateOptions) (replicate bool, sync bool) {
if s3Err := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, "", r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone {
return
}
return mustReplicater(ctx, bucket, object, meta, replStatus, metadataOnly)
return mustReplicater(ctx, bucket, object, opts)
}
// mustReplicater returns 2 booleans - true if object meets replication criteria and true if replication is to be done in
// a synchronous manner.
func mustReplicater(ctx context.Context, bucket, object string, meta map[string]string, replStatus string, metadataOnly bool) (replicate bool, sync bool) {
func mustReplicater(ctx context.Context, bucket, object string, mopts mustReplicateOptions) (replicate bool, sync bool) {
if globalIsGateway {
return replicate, sync
}
if rs, ok := meta[xhttp.AmzBucketReplicationStatus]; ok {
replStatus = rs
}
if replication.StatusType(replStatus) == replication.Replica && !metadataOnly {
replStatus := mopts.ReplicationStatus()
if replStatus == replication.Replica && !mopts.isMetadataReplication() {
return replicate, sync
}
cfg, err := getReplicationConfig(ctx, bucket)
@@ -124,11 +162,12 @@ func mustReplicater(ctx context.Context, bucket, object string, meta map[string]
return replicate, sync
}
opts := replication.ObjectOpts{
Name: object,
SSEC: crypto.SSEC.IsEncrypted(meta),
Replica: replication.StatusType(replStatus) == replication.Replica,
Name: object,
SSEC: crypto.SSEC.IsEncrypted(mopts.meta),
Replica: replStatus == replication.Replica,
ExistingObject: mopts.isExistingObjectReplication(),
}
tagStr, ok := meta[xhttp.AmzObjectTagging]
tagStr, ok := mopts.meta[xhttp.AmzObjectTagging]
if ok {
opts.UserTags = tagStr
}
@@ -226,7 +265,7 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
// target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently
// deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds
// on target.
func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectAPI ObjectLayer) {
func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer) {
bucket := dobj.Bucket
versionID := dobj.DeleteMarkerVersionID
if versionID == "" {
@@ -760,7 +799,9 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
if objInfo.UserTags != "" {
objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
}
if ri.OpType == replication.ExistingObjectReplicationType {
objInfo.UserDefined[xhttp.MinIOReplicationResetStatus] = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), ri.ResetID)
}
// FIXME: add support for missing replication events
// - event.ObjectReplicationMissedThreshold
// - event.ObjectReplicationReplicatedAfterThreshold
@@ -774,7 +815,8 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
return
}
// Leave metadata in `PENDING` state if inline replication fails to save iops
if ri.OpType == replication.HealReplicationType || replicationStatus == replication.Completed {
if ri.OpType == replication.HealReplicationType ||
replicationStatus == replication.Completed {
// This lower level implementation is necessary to avoid write locks from CopyObject.
poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size)
if err != nil {
@@ -834,10 +876,12 @@ func filterReplicationStatusMetadata(metadata map[string]string) map[string]stri
return dst
}
// DeletedObjectVersionInfo has info on deleted object
type DeletedObjectVersionInfo struct {
// DeletedObjectReplicationInfo has info on deleted object
type DeletedObjectReplicationInfo struct {
DeletedObject
Bucket string
Bucket string
OpType replication.Type
ResetID string
}
var (
@@ -847,35 +891,40 @@ var (
// ReplicationPool describes replication pool
type ReplicationPool struct {
objLayer ObjectLayer
ctx context.Context
mrfWorkerKillCh chan struct{}
workerKillCh chan struct{}
replicaCh chan ReplicateObjectInfo
replicaDeleteCh chan DeletedObjectVersionInfo
mrfReplicaCh chan ReplicateObjectInfo
workerSize int
mrfWorkerSize int
workerWg sync.WaitGroup
mrfWorkerWg sync.WaitGroup
once sync.Once
mu sync.Mutex
objLayer ObjectLayer
ctx context.Context
mrfWorkerKillCh chan struct{}
workerKillCh chan struct{}
replicaCh chan ReplicateObjectInfo
replicaDeleteCh chan DeletedObjectReplicationInfo
mrfReplicaCh chan ReplicateObjectInfo
existingReplicaCh chan ReplicateObjectInfo
existingReplicaDeleteCh chan DeletedObjectReplicationInfo
workerSize int
mrfWorkerSize int
workerWg sync.WaitGroup
mrfWorkerWg sync.WaitGroup
once sync.Once
mu sync.Mutex
}
// NewReplicationPool creates a pool of replication workers of specified size
func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool {
pool := &ReplicationPool{
replicaCh: make(chan ReplicateObjectInfo, 100000),
replicaDeleteCh: make(chan DeletedObjectVersionInfo, 100000),
mrfReplicaCh: make(chan ReplicateObjectInfo, 100000),
workerKillCh: make(chan struct{}, opts.Workers),
mrfWorkerKillCh: make(chan struct{}, opts.FailedWorkers),
ctx: ctx,
objLayer: o,
replicaCh: make(chan ReplicateObjectInfo, 100000),
replicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000),
mrfReplicaCh: make(chan ReplicateObjectInfo, 100000),
workerKillCh: make(chan struct{}, opts.Workers),
mrfWorkerKillCh: make(chan struct{}, opts.FailedWorkers),
existingReplicaCh: make(chan ReplicateObjectInfo, 100000),
existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000),
ctx: ctx,
objLayer: o,
}
pool.ResizeWorkers(opts.Workers)
pool.ResizeFailedWorkers(opts.FailedWorkers)
go pool.AddExistingObjectReplicateWorker()
return pool
}
@@ -921,6 +970,26 @@ func (p *ReplicationPool) AddWorker() {
}
// AddExistingObjectReplicateWorker adds a worker to queue existing objects that need to be sync'd
func (p *ReplicationPool) AddExistingObjectReplicateWorker() {
for {
select {
case <-p.ctx.Done():
return
case oi, ok := <-p.existingReplicaCh:
if !ok {
return
}
replicateObject(p.ctx, oi, p.objLayer)
case doi, ok := <-p.existingReplicaDeleteCh:
if !ok {
return
}
replicateDelete(p.ctx, doi, p.objLayer)
}
}
}
// ResizeWorkers sets replication workers pool to new size
func (p *ReplicationPool) ResizeWorkers(n int) {
p.mu.Lock()
@@ -962,6 +1031,7 @@ func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) {
p.once.Do(func() {
close(p.replicaCh)
close(p.mrfReplicaCh)
close(p.existingReplicaCh)
})
case p.mrfReplicaCh <- ri:
default:
@@ -972,27 +1042,44 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
if p == nil {
return
}
var ch chan ReplicateObjectInfo
switch ri.OpType {
case replication.ExistingObjectReplicationType:
ch = p.existingReplicaCh
default:
ch = p.replicaCh
}
select {
case <-GlobalContext.Done():
p.once.Do(func() {
close(p.replicaCh)
close(p.mrfReplicaCh)
close(p.existingReplicaCh)
})
case p.replicaCh <- ri:
case ch <- ri:
default:
}
}
func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectVersionInfo) {
func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInfo) {
if p == nil {
return
}
var ch chan DeletedObjectReplicationInfo
switch doi.OpType {
case replication.ExistingObjectReplicationType:
ch = p.existingReplicaDeleteCh
default:
ch = p.replicaDeleteCh
}
select {
case <-GlobalContext.Done():
p.once.Do(func() {
close(p.replicaDeleteCh)
close(p.existingReplicaDeleteCh)
})
case p.replicaDeleteCh <- doi:
case ch <- doi:
default:
}
}
@@ -1157,7 +1244,78 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer,
}
}
func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectVersionInfo, o ObjectLayer, sync bool) {
func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer, sync bool) {
globalReplicationPool.queueReplicaDeleteTask(dv)
globalReplicationStats.Update(dv.Bucket, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
}
type replicationConfig struct {
Config *replication.Config
ResetID string
ResetBeforeDate time.Time
}
func (c replicationConfig) Empty() bool {
return c.Config == nil
}
func (c replicationConfig) Replicate(opts replication.ObjectOpts) bool {
return c.Config.Replicate(opts)
}
// Resync returns true if replication reset is requested
func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo) bool {
if c.Empty() {
return false
}
// existing object replication does not apply to un-versioned objects
if oi.VersionID == "" || oi.VersionID == nullVersionID {
return false
}
var replicate bool
if oi.DeleteMarker {
if c.Replicate(replication.ObjectOpts{
Name: oi.Name,
SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined),
UserTags: oi.UserTags,
DeleteMarker: oi.DeleteMarker,
VersionID: oi.VersionID,
OpType: replication.DeleteReplicationType,
ExistingObject: true}) {
replicate = true
}
} else {
// Ignore previous replication status when deciding if object can be re-replicated
objInfo := oi.Clone()
objInfo.ReplicationStatus = replication.StatusType("")
replicate, _ = mustReplicater(ctx, oi.Bucket, oi.Name, getMustReplicateOptions(objInfo, replication.ExistingObjectReplicationType))
}
return c.resync(oi, replicate)
}
// wrapper function for testability. Returns true if a new reset is requested on
// already replicated objects OR object qualifies for existing object replication
// and no reset requested.
func (c replicationConfig) resync(oi ObjectInfo, replicate bool) bool {
if !replicate {
return false
}
rs, ok := oi.UserDefined[xhttp.MinIOReplicationResetStatus]
if !ok { // existing object replication is enabled and object version is unreplicated so far.
if c.ResetID != "" && oi.ModTime.Before(c.ResetBeforeDate) { // trigger replication if `mc replicate reset` requested
return true
}
return oi.ReplicationStatus != replication.Completed
}
if c.ResetID == "" || c.ResetBeforeDate.Equal(timeSentinel) { // no reset in progress
return false
}
// if already replicated, return true if a new reset was requested.
splits := strings.SplitN(rs, ";", 2)
newReset := splits[1] != c.ResetID
if !newReset && oi.ReplicationStatus == replication.Completed {
// already replicated and no reset requested
return false
}
return newReset && oi.ModTime.Before(c.ResetBeforeDate)
}