From 0a8b78cb84f464356841ee6814c5c60d619d5a92 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 12 Jul 2022 10:43:32 -0700 Subject: [PATCH] fix: simplify passing auditLog eventType (#15278) Rename Trigger -> Event to be a more appropriate name for the audit event. Bonus: fixes a bug in AddMRFWorker() it did not cancel the waitgroup, leading to waitgroup leaks. --- cmd/bucket-handlers.go | 1 + cmd/bucket-replication.go | 59 +++++++++++++++----------- cmd/data-scanner.go | 14 +++--- cmd/erasure-server-pool-decom.go | 2 +- cmd/object-api-datatypes.go | 1 + cmd/object-handlers.go | 3 +- cmd/utils.go | 5 ++- internal/logger/message/audit/entry.go | 7 ++- 8 files changed, 56 insertions(+), 36 deletions(-) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 1da1c1e0f..4183a74a5 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -649,6 +649,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, dv := DeletedObjectReplicationInfo{ DeletedObject: dobj, Bucket: bucket, + EventType: ReplicateIncomingDelete, } scheduleReplicationDelete(ctx, dv, objectAPI) } diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 6f0a93e54..00f86e977 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -347,7 +347,7 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet // target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently // deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds // on target. -func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer, trigger string) { +func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer) { var replicationStatus replication.StatusType bucket := dobj.Bucket versionID := dobj.DeleteMarkerVersionID @@ -358,7 +358,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj defer func() { replStatus := string(replicationStatus) auditLogInternal(context.Background(), bucket, dobj.ObjectName, AuditLogOptions{ - Trigger: trigger, + Event: dobj.EventType, APIName: ReplicateDeleteAPI, VersionID: versionID, Status: replStatus, @@ -857,7 +857,7 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo, opType replicati // replicateObject replicates the specified version of the object to destination bucket // The source object is then updated to reflect the replication status. -func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, trigger string) { +func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer) { var replicationStatus replication.StatusType defer func() { if replicationStatus.Empty() { @@ -868,7 +868,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje replicationStatus = ri.ReplicationStatus } auditLogInternal(ctx, ri.Bucket, ri.Name, AuditLogOptions{ - Trigger: trigger, + Event: ri.EventType, APIName: ReplicateObjectAPI, VersionID: ri.VersionID, Status: replicationStatus.String(), @@ -990,6 +990,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje // the target site is down. Leave it to scanner to catch up instead. if rinfos.ReplicationStatus() != replication.Completed && ri.RetryCount < 1 { ri.OpType = replication.HealReplicationType + ri.EventType = ReplicateMRF ri.ReplicationStatusInternal = rinfos.ReplicationStatusInternal() ri.RetryCount++ globalReplicationPool.queueReplicaFailedTask(ri) @@ -1291,6 +1292,7 @@ func filterReplicationStatusMetadata(metadata map[string]string) map[string]stri type DeletedObjectReplicationInfo struct { DeletedObject Bucket string + EventType string OpType replication.Type ResetID string TargetArn string @@ -1313,12 +1315,15 @@ const ( // ReplicateMRF - audit trail for replication from Most Recent Failures (MRF) queue ReplicateMRF = "replicate:mrf" - // ReplicateIncoming - audit trail indicating replication started [could be from incoming/existing/heal activity] + // ReplicateIncoming - audit trail of inline replication ReplicateIncoming = "replicate:incoming" + // ReplicateIncomingDelete - audit trail of inline replication of deletes. + ReplicateIncomingDelete = "replicate:incoming:delete" + // ReplicateHeal - audit trail for healing of failed/pending replications ReplicateHeal = "replicate:heal" - // ReplicateDelete - audit trail for delete replication - ReplicateDelete = "replicate:delete" + // ReplicateHealDelete - audit trail of healing of failed/pending delete replications. + ReplicateHealDelete = "replicate:heal:delete" ) var ( @@ -1364,13 +1369,14 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool pool.ResizeWorkers(opts.Workers) pool.ResizeFailedWorkers(opts.FailedWorkers) go pool.AddExistingObjectReplicateWorker() - go pool.periodicResyncMetaSave(ctx, o) + go pool.updateResyncStatus(ctx, o) return pool } // AddMRFWorker adds a pending/failed replication worker to handle requests that could not be queued // to the other workers func (p *ReplicationPool) AddMRFWorker() { + defer p.mrfWorkerWg.Done() for { select { case <-p.ctx.Done(): @@ -1379,7 +1385,7 @@ func (p *ReplicationPool) AddMRFWorker() { if !ok { return } - replicateObject(p.ctx, oi, p.objLayer, ReplicateMRF) + replicateObject(p.ctx, oi, p.objLayer) case <-p.mrfWorkerKillCh: return } @@ -1397,12 +1403,12 @@ func (p *ReplicationPool) AddWorker() { if !ok { return } - replicateObject(p.ctx, oi, p.objLayer, ReplicateIncoming) + replicateObject(p.ctx, oi, p.objLayer) case doi, ok := <-p.replicaDeleteCh: if !ok { return } - replicateDelete(p.ctx, doi, p.objLayer, ReplicateDelete) + replicateDelete(p.ctx, doi, p.objLayer) case <-p.workerKillCh: return } @@ -1419,12 +1425,12 @@ func (p *ReplicationPool) AddExistingObjectReplicateWorker() { if !ok { return } - replicateObject(p.ctx, oi, p.objLayer, ReplicateExisting) + replicateObject(p.ctx, oi, p.objLayer) case doi, ok := <-p.existingReplicaDeleteCh: if !ok { return } - replicateDelete(p.ctx, doi, p.objLayer, ReplicateExistingDelete) + replicateDelete(p.ctx, doi, p.objLayer) } } } @@ -1482,7 +1488,7 @@ func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) { }) case p.mrfReplicaCh <- ri: default: - logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Replication failed workers could not keep up with healing failures - consider increasing number of replication failed workers with `mc admin config set api replication_failed_workers=%d`", p.suggestedWorkers(true)), replicationSubsystem) + logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up retrying failed replication - we recommend increasing number of replication failed workers with `mc admin config set api replication_failed_workers=%d`", p.suggestedWorkers(true)), replicationSubsystem) } } @@ -1508,7 +1514,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { }) case ch <- ri: default: - logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Replication workers could not keep up with incoming traffic - consider increasing number of replication workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), replicationSubsystem) + logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing number of replicate object workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), replicationSubsystem) } } @@ -1545,7 +1551,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf }) case ch <- doi: default: - logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Replication workers could not keep up with incoming traffic - consider increasing number of replication workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), replicationSubsystem) + logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing number of replicate workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), replicationSubsystem) } } @@ -1722,10 +1728,11 @@ func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, rs } func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, dsc ReplicateDecision, opType replication.Type) { + ri := ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType, Dsc: dsc, EventType: ReplicateIncoming} if dsc.Synchronous() { - replicateObject(ctx, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType, Dsc: dsc}, o, ReplicateIncoming) + replicateObject(ctx, ri, o) } else { - globalReplicationPool.queueReplicaTask(ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType, Dsc: dsc}) + globalReplicationPool.queueReplicaTask(ri) } if sz, err := objInfo.GetActualSize(); err == nil { for arn := range dsc.targetsMap { @@ -1956,10 +1963,10 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic return calculateBucketReplicationStats(bucket, u, bucketStats) } -const resyncTimeInterval = time.Minute * 10 +const resyncTimeInterval = time.Minute * 1 -// periodicResyncMetaSave saves in-memory resync meta stats to disk in periodic intervals -func (p *ReplicationPool) periodicResyncMetaSave(ctx context.Context, objectAPI ObjectLayer) { +// updateResyncStatus persists in-memory resync metadata stats to disk at periodic intervals +func (p *ReplicationPool) updateResyncStatus(ctx context.Context, objectAPI ObjectLayer) { resyncTimer := time.NewTimer(resyncTimeInterval) defer resyncTimer.Stop() @@ -2085,13 +2092,15 @@ func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime}, DeleteMarker: roi.DeleteMarker, }, - Bucket: roi.Bucket, - OpType: replication.ExistingObjectReplicationType, + Bucket: roi.Bucket, + OpType: replication.ExistingObjectReplicationType, + EventType: ReplicateExistingDelete, } - replicateDelete(ctx, doi, objectAPI, ReplicateDelete) + replicateDelete(ctx, doi, objectAPI) } else { roi.OpType = replication.ExistingObjectReplicationType - replicateObject(ctx, roi, objectAPI, ReplicateExisting) + roi.EventType = ReplicateExisting + replicateObject(ctx, roi, objectAPI) } _, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, miniogo.StatObjectOptions{ VersionID: roi.VersionID, diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 9eeb9b3af..23dde8266 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -1278,12 +1278,14 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj switch oi.ReplicationStatus { case replication.Pending, replication.Failed: + roi.EventType = ReplicateHeal globalReplicationPool.queueReplicaTask(roi) return case replication.Replica: sizeS.replicaSize += oi.Size } if roi.ExistingObjResync.mustResync() { + roi.EventType = ReplicateExisting globalReplicationPool.queueReplicaTask(roi) } } @@ -1309,11 +1311,13 @@ func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime}, DeleteMarker: roi.DeleteMarker, }, - Bucket: roi.Bucket, - OpType: replication.HealReplicationType, + Bucket: roi.Bucket, + OpType: replication.HealReplicationType, + EventType: ReplicateHealDelete, } if roi.ExistingObjResync.mustResync() { doi.OpType = replication.ExistingObjectReplicationType + doi.EventType = ReplicateExistingDelete queueReplicateDeletesWrapper(doi, roi.ExistingObjResync) return } @@ -1476,9 +1480,9 @@ const ( ILMTransition = " ilm:transition" ) -func auditLogLifecycle(ctx context.Context, oi ObjectInfo, trigger string) { +func auditLogLifecycle(ctx context.Context, oi ObjectInfo, event string) { var apiName string - switch trigger { + switch event { case ILMExpiry: apiName = "ILMExpiry" case ILMFreeVersionDelete: @@ -1487,7 +1491,7 @@ func auditLogLifecycle(ctx context.Context, oi ObjectInfo, trigger string) { apiName = "ILMTransition" } auditLogInternal(ctx, oi.Bucket, oi.Name, AuditLogOptions{ - Trigger: trigger, + Event: event, APIName: apiName, VersionID: oi.VersionID, }) diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index cfae29377..98b7c2c57 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -1151,7 +1151,7 @@ func auditLogDecom(ctx context.Context, apiName, bucket, object, versionID strin errStr = err.Error() } auditLogInternal(ctx, bucket, object, AuditLogOptions{ - Trigger: "decommissioning", + Event: "decommission", APIName: apiName, VersionID: versionID, Error: errStr, diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 54a4fa73a..1ba4d1638 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -262,6 +262,7 @@ func (o ObjectInfo) tierStats() tierStats { type ReplicateObjectInfo struct { ObjectInfo OpType replication.Type + EventType string RetryCount uint32 ResetID string Dsc ReplicateDecision diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 2dc4fbc3a..44b8ff0a6 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -3567,7 +3567,8 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. DeleteMarker: objInfo.DeleteMarker, ReplicationState: objInfo.getReplicationState(dsc.String(), opts.VersionID, false), }, - Bucket: bucket, + Bucket: bucket, + EventType: ReplicateIncomingDelete, } scheduleReplicationDelete(ctx, dobj, objectAPI) } diff --git a/cmd/utils.go b/cmd/utils.go index 3a2a91bcc..9d43eb05a 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -1036,7 +1036,7 @@ func totalNodeCount() uint64 { // AuditLogOptions takes options for audit logging subsystem activity type AuditLogOptions struct { - Trigger string + Event string APIName string Status string VersionID string @@ -1046,7 +1046,8 @@ type AuditLogOptions struct { // sends audit logs for internal subsystem activity func auditLogInternal(ctx context.Context, bucket, object string, opts AuditLogOptions) { entry := audit.NewEntry(globalDeploymentID) - entry.Trigger = opts.Trigger + entry.Trigger = opts.Event + entry.Event = opts.Event entry.Error = opts.Error entry.API.Name = opts.APIName entry.API.Bucket = bucket diff --git a/internal/logger/message/audit/entry.go b/internal/logger/message/audit/entry.go index ef02d1dce..ab76018b0 100644 --- a/internal/logger/message/audit/entry.go +++ b/internal/logger/message/audit/entry.go @@ -40,8 +40,11 @@ type Entry struct { Version string `json:"version"` DeploymentID string `json:"deploymentid,omitempty"` Time time.Time `json:"time"` - Trigger string `json:"trigger"` - API struct { + Event string `json:"event"` + // deprecated replaced by 'Event', kept here for some + // time for backward compatibility with k8s Operator. + Trigger string `json:"trigger"` + API struct { Name string `json:"name,omitempty"` Bucket string `json:"bucket,omitempty"` Object string `json:"object,omitempty"`