fix: simplify passing auditLog eventType (#15278)

Rename Trigger -> Event to be a more appropriate
name for the audit event.

Bonus: fixes a bug in AddMRFWorker() it did not
cancel the waitgroup, leading to waitgroup leaks.
This commit is contained in:
Harshavardhana
2022-07-12 10:43:32 -07:00
committed by GitHub
parent b4eb74f5ff
commit 0a8b78cb84
8 changed files with 56 additions and 36 deletions

View File

@@ -347,7 +347,7 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
// target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently
// deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds
// on target.
func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer, trigger string) {
func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer) {
var replicationStatus replication.StatusType
bucket := dobj.Bucket
versionID := dobj.DeleteMarkerVersionID
@@ -358,7 +358,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
defer func() {
replStatus := string(replicationStatus)
auditLogInternal(context.Background(), bucket, dobj.ObjectName, AuditLogOptions{
Trigger: trigger,
Event: dobj.EventType,
APIName: ReplicateDeleteAPI,
VersionID: versionID,
Status: replStatus,
@@ -857,7 +857,7 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo, opType replicati
// replicateObject replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, trigger string) {
func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer) {
var replicationStatus replication.StatusType
defer func() {
if replicationStatus.Empty() {
@@ -868,7 +868,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
replicationStatus = ri.ReplicationStatus
}
auditLogInternal(ctx, ri.Bucket, ri.Name, AuditLogOptions{
Trigger: trigger,
Event: ri.EventType,
APIName: ReplicateObjectAPI,
VersionID: ri.VersionID,
Status: replicationStatus.String(),
@@ -990,6 +990,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
// the target site is down. Leave it to scanner to catch up instead.
if rinfos.ReplicationStatus() != replication.Completed && ri.RetryCount < 1 {
ri.OpType = replication.HealReplicationType
ri.EventType = ReplicateMRF
ri.ReplicationStatusInternal = rinfos.ReplicationStatusInternal()
ri.RetryCount++
globalReplicationPool.queueReplicaFailedTask(ri)
@@ -1291,6 +1292,7 @@ func filterReplicationStatusMetadata(metadata map[string]string) map[string]stri
type DeletedObjectReplicationInfo struct {
DeletedObject
Bucket string
EventType string
OpType replication.Type
ResetID string
TargetArn string
@@ -1313,12 +1315,15 @@ const (
// ReplicateMRF - audit trail for replication from Most Recent Failures (MRF) queue
ReplicateMRF = "replicate:mrf"
// ReplicateIncoming - audit trail indicating replication started [could be from incoming/existing/heal activity]
// ReplicateIncoming - audit trail of inline replication
ReplicateIncoming = "replicate:incoming"
// ReplicateIncomingDelete - audit trail of inline replication of deletes.
ReplicateIncomingDelete = "replicate:incoming:delete"
// ReplicateHeal - audit trail for healing of failed/pending replications
ReplicateHeal = "replicate:heal"
// ReplicateDelete - audit trail for delete replication
ReplicateDelete = "replicate:delete"
// ReplicateHealDelete - audit trail of healing of failed/pending delete replications.
ReplicateHealDelete = "replicate:heal:delete"
)
var (
@@ -1364,13 +1369,14 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
pool.ResizeWorkers(opts.Workers)
pool.ResizeFailedWorkers(opts.FailedWorkers)
go pool.AddExistingObjectReplicateWorker()
go pool.periodicResyncMetaSave(ctx, o)
go pool.updateResyncStatus(ctx, o)
return pool
}
// AddMRFWorker adds a pending/failed replication worker to handle requests that could not be queued
// to the other workers
func (p *ReplicationPool) AddMRFWorker() {
defer p.mrfWorkerWg.Done()
for {
select {
case <-p.ctx.Done():
@@ -1379,7 +1385,7 @@ func (p *ReplicationPool) AddMRFWorker() {
if !ok {
return
}
replicateObject(p.ctx, oi, p.objLayer, ReplicateMRF)
replicateObject(p.ctx, oi, p.objLayer)
case <-p.mrfWorkerKillCh:
return
}
@@ -1397,12 +1403,12 @@ func (p *ReplicationPool) AddWorker() {
if !ok {
return
}
replicateObject(p.ctx, oi, p.objLayer, ReplicateIncoming)
replicateObject(p.ctx, oi, p.objLayer)
case doi, ok := <-p.replicaDeleteCh:
if !ok {
return
}
replicateDelete(p.ctx, doi, p.objLayer, ReplicateDelete)
replicateDelete(p.ctx, doi, p.objLayer)
case <-p.workerKillCh:
return
}
@@ -1419,12 +1425,12 @@ func (p *ReplicationPool) AddExistingObjectReplicateWorker() {
if !ok {
return
}
replicateObject(p.ctx, oi, p.objLayer, ReplicateExisting)
replicateObject(p.ctx, oi, p.objLayer)
case doi, ok := <-p.existingReplicaDeleteCh:
if !ok {
return
}
replicateDelete(p.ctx, doi, p.objLayer, ReplicateExistingDelete)
replicateDelete(p.ctx, doi, p.objLayer)
}
}
}
@@ -1482,7 +1488,7 @@ func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) {
})
case p.mrfReplicaCh <- ri:
default:
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Replication failed workers could not keep up with healing failures - consider increasing number of replication failed workers with `mc admin config set api replication_failed_workers=%d`", p.suggestedWorkers(true)), replicationSubsystem)
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up retrying failed replication - we recommend increasing number of replication failed workers with `mc admin config set api replication_failed_workers=%d`", p.suggestedWorkers(true)), replicationSubsystem)
}
}
@@ -1508,7 +1514,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
})
case ch <- ri:
default:
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Replication workers could not keep up with incoming traffic - consider increasing number of replication workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), replicationSubsystem)
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing number of replicate object workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), replicationSubsystem)
}
}
@@ -1545,7 +1551,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
})
case ch <- doi:
default:
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Replication workers could not keep up with incoming traffic - consider increasing number of replication workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), replicationSubsystem)
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing number of replicate workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), replicationSubsystem)
}
}
@@ -1722,10 +1728,11 @@ func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, rs
}
func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, dsc ReplicateDecision, opType replication.Type) {
ri := ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType, Dsc: dsc, EventType: ReplicateIncoming}
if dsc.Synchronous() {
replicateObject(ctx, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType, Dsc: dsc}, o, ReplicateIncoming)
replicateObject(ctx, ri, o)
} else {
globalReplicationPool.queueReplicaTask(ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType, Dsc: dsc})
globalReplicationPool.queueReplicaTask(ri)
}
if sz, err := objInfo.GetActualSize(); err == nil {
for arn := range dsc.targetsMap {
@@ -1956,10 +1963,10 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic
return calculateBucketReplicationStats(bucket, u, bucketStats)
}
const resyncTimeInterval = time.Minute * 10
const resyncTimeInterval = time.Minute * 1
// periodicResyncMetaSave saves in-memory resync meta stats to disk in periodic intervals
func (p *ReplicationPool) periodicResyncMetaSave(ctx context.Context, objectAPI ObjectLayer) {
// updateResyncStatus persists in-memory resync metadata stats to disk at periodic intervals
func (p *ReplicationPool) updateResyncStatus(ctx context.Context, objectAPI ObjectLayer) {
resyncTimer := time.NewTimer(resyncTimeInterval)
defer resyncTimer.Stop()
@@ -2085,13 +2092,15 @@ func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI
DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
DeleteMarker: roi.DeleteMarker,
},
Bucket: roi.Bucket,
OpType: replication.ExistingObjectReplicationType,
Bucket: roi.Bucket,
OpType: replication.ExistingObjectReplicationType,
EventType: ReplicateExistingDelete,
}
replicateDelete(ctx, doi, objectAPI, ReplicateDelete)
replicateDelete(ctx, doi, objectAPI)
} else {
roi.OpType = replication.ExistingObjectReplicationType
replicateObject(ctx, roi, objectAPI, ReplicateExisting)
roi.EventType = ReplicateExisting
replicateObject(ctx, roi, objectAPI)
}
_, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, miniogo.StatObjectOptions{
VersionID: roi.VersionID,