diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index bc4b9f4b7..85cc94fc7 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -605,6 +605,12 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationActio // replicateObject replicates the specified version of the object to destination bucket // The source object is then updated to reflect the replication status. func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer) { + auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{ + Trigger: ReplicationIncomingActivity, + APIName: "s3:ReplicateObject", + VersionID: ri.VersionID, + Status: ri.ReplicationStatus.String(), + }) objInfo := ri.ObjectInfo bucket := objInfo.Bucket object := objInfo.Name @@ -631,7 +637,8 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje }) return } - gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, writeLock, ObjectOptions{ + var closeOnDefer bool + gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{ VersionID: objInfo.VersionID, }) if err != nil { @@ -644,8 +651,12 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje logger.LogIf(ctx, fmt.Errorf("Unable to update replicate for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err)) return } - - defer gr.Close() // hold write lock for entire transaction + defer func() { + if closeOnDefer { + gr.Close() + } + }() + closeOnDefer = true objInfo = gr.ObjInfo size, err := objInfo.GetActualSize() @@ -686,28 +697,29 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje if objInfo.ReplicationStatus == replication.Pending || objInfo.ReplicationStatus == replication.Failed { // if metadata is not updated for some reason after replication, such as 503 encountered while updating metadata - make sure // to set ReplicationStatus as Completed.Note that replication Stats would have been updated despite metadata update failure. - z, ok := objectAPI.(*erasureServerPools) - if !ok { - return + gr.Close() + closeOnDefer = false + popts := ObjectOptions{ + MTime: objInfo.ModTime, + VersionID: objInfo.VersionID, + UserDefined: make(map[string]string, len(objInfo.UserDefined)), } - // This lower level implementation is necessary to avoid write locks from CopyObject. - poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size) - if err != nil { + for k, v := range objInfo.UserDefined { + popts.UserDefined[k] = v + } + popts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Completed.String() + if objInfo.UserTags != "" { + popts.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags + } + if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) } else { - fi := FileInfo{} - fi.VersionID = objInfo.VersionID - fi.Metadata = make(map[string]string, len(objInfo.UserDefined)) - for k, v := range objInfo.UserDefined { - fi.Metadata[k] = v - } - fi.Metadata[xhttp.AmzBucketReplicationStatus] = replication.Completed.String() - if objInfo.UserTags != "" { - fi.Metadata[xhttp.AmzObjectTagging] = objInfo.UserTags - } - if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, fi); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) - } + auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{ + Trigger: ReplicationIncomingActivity, + APIName: "s3:ReplicateObject", + VersionID: ri.VersionID, + Status: ri.ReplicationStatus.String(), + }) } } return @@ -761,6 +773,8 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) } } + gr.Close() + closeOnDefer = false prevReplStatus := objInfo.ReplicationStatus objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String() @@ -778,28 +792,32 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje eventName = event.ObjectReplicationFailed } - z, ok := objectAPI.(*erasureServerPools) - if !ok { - return - } // Leave metadata in `PENDING` state if inline replication fails to save iops if ri.OpType == replication.HealReplicationType || replicationStatus == replication.Completed { - // This lower level implementation is necessary to avoid write locks from CopyObject. - poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size) - if err != nil { + popts := ObjectOptions{ + MTime: objInfo.ModTime, + VersionID: objInfo.VersionID, + UserDefined: make(map[string]string, len(objInfo.UserDefined)), + } + for k, v := range objInfo.UserDefined { + popts.UserDefined[k] = v + } + popts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Completed.String() + if objInfo.UserTags != "" { + popts.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags + } + if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) } else { - fi := FileInfo{} - fi.VersionID = objInfo.VersionID - fi.Metadata = make(map[string]string, len(objInfo.UserDefined)) - for k, v := range objInfo.UserDefined { - fi.Metadata[k] = v - } - if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, fi); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) - } + auditLogInternal(context.Background(), objInfo.Bucket, objInfo.Name, AuditLogOptions{ + Trigger: ReplicationIncomingActivity, + APIName: "s3:ReplicateObject", + VersionID: objInfo.VersionID, + Status: replicationStatus.String(), + }) } + opType := replication.MetadataReplicationType if rtype == replicateAll { opType = replication.ObjectReplicationType @@ -852,6 +870,23 @@ type DeletedObjectReplicationInfo struct { ResetID string } +const ( + // ReplicationQueuedActivity - replication being queued activity trail + ReplicationQueuedActivity = "replication:queue" + // ReplicationExistingActivity - activity trail for existing objects replication + ReplicationExistingActivity = "replication:existing" + // ReplicationMRFActivity - activity trail for replication from Most Recent Failures (MRF) queue + ReplicationMRFActivity = "replication:mrf" + // ReplicationIncomingActivity - activity trail indicating replication started [could be from incoming/existing/heal activity] + ReplicationIncomingActivity = "replication:incoming" + // ReplicationHealActivity - activity trail for healing of failed/pending replications + ReplicationHealActivity = "replication:heal" + // ReplicationDeleteActivity - activity trail for delete replication + ReplicationDeleteActivity = "replication:delete" + // ReplicationExistingDeleteActivity - activity trail for delete replication triggered for existing delete markers + ReplicationExistingDeleteActivity = "replication:delete:existing" +) + var ( globalReplicationPool *ReplicationPool globalReplicationStats *ReplicationStats @@ -1002,6 +1037,12 @@ func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) { close(p.existingReplicaCh) }) case p.mrfReplicaCh <- ri: + auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{ + Trigger: ReplicationMRFActivity, + APIName: "s3:ReplicateObject", + VersionID: ri.VersionID, + Status: ri.ReplicationStatus.String(), + }) default: } } @@ -1011,9 +1052,14 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { return } var ch chan ReplicateObjectInfo + trigger := ReplicationQueuedActivity switch ri.OpType { case replication.ExistingObjectReplicationType: ch = p.existingReplicaCh + trigger = ReplicationExistingActivity + case replication.HealReplicationType: + ch = p.replicaCh + trigger = ReplicationHealActivity default: ch = p.replicaCh } @@ -1025,6 +1071,12 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { close(p.existingReplicaCh) }) case ch <- ri: + auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{ + Trigger: trigger, + APIName: "s3:ReplicateObject", + VersionID: ri.VersionID, + Status: string(ri.ReplicationStatus), + }) default: } } @@ -1033,10 +1085,15 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf if p == nil { return } + trigger := ReplicationDeleteActivity var ch chan DeletedObjectReplicationInfo switch doi.OpType { case replication.ExistingObjectReplicationType: ch = p.existingReplicaDeleteCh + trigger = ReplicationExistingDeleteActivity + case replication.HealReplicationType: + ch = p.replicaDeleteCh + trigger = ReplicationHealActivity default: ch = p.replicaDeleteCh } @@ -1048,6 +1105,16 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf close(p.existingReplicaDeleteCh) }) case ch <- doi: + replStatus := doi.DeleteMarkerReplicationStatus + if doi.VersionPurgeStatus != "" { + replStatus = string(doi.VersionPurgeStatus) + } + auditLogInternal(context.Background(), doi.Bucket, doi.ObjectName, AuditLogOptions{ + Trigger: trigger, + APIName: "s3:ReplicateDelete", + VersionID: doi.VersionID, + Status: replStatus, + }) default: } } diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 78d6a2f4f..b8b42b02a 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -40,7 +40,6 @@ import ( "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/logger/message/audit" "github.com/minio/pkg/console" ) @@ -1342,12 +1341,12 @@ func (d *dynamicSleeper) Update(factor float64, maxWait time.Duration) error { return nil } +// ILMExpiryActivity - activity trail for ILM expiry +const ILMExpiryActivity = "ilm:expiry" + func auditLogLifecycle(ctx context.Context, bucket, object string) { - entry := audit.NewEntry(globalDeploymentID) - entry.Trigger = "internal-scanner" - entry.API.Name = "DeleteObject" - entry.API.Bucket = bucket - entry.API.Object = object - ctx = logger.SetAuditEntry(ctx, &entry) - logger.AuditLog(ctx, nil, nil, nil) + auditLogInternal(ctx, bucket, object, AuditLogOptions{ + Trigger: ILMExpiryActivity, + APIName: "s3:ExpireObject", + }) } diff --git a/cmd/utils.go b/cmd/utils.go index 926c73487..c2755a4ea 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -48,6 +48,7 @@ import ( "github.com/minio/minio/internal/handlers" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/logger/message/audit" "github.com/minio/minio/internal/rest" "github.com/minio/pkg/certs" ) @@ -940,3 +941,27 @@ func totalNodeCount() uint64 { } return totalNodesCount } + +// AuditLogOptions takes options for audit logging subsystem activity +type AuditLogOptions struct { + Trigger string + APIName string + Status string + VersionID string +} + +// sends audit logs for internal subsystem activity +func auditLogInternal(ctx context.Context, bucket, object string, opts AuditLogOptions) { + entry := audit.NewEntry(globalDeploymentID) + entry.Trigger = opts.Trigger + entry.API.Name = opts.APIName + entry.API.Bucket = bucket + entry.API.Object = object + if opts.VersionID != "" { + entry.ReqQuery = make(map[string]string) + entry.ReqQuery[xhttp.VersionID] = opts.VersionID + } + entry.API.Status = opts.Status + ctx = logger.SetAuditEntry(ctx, &entry) + logger.AuditLog(ctx, nil, nil, nil) +}