Change replication to use read lock instead of writelock (#12581)

Fixes #12573

This PR also adding audit logging for replication activity
This commit is contained in:
Poorna Krishnamoorthy 2021-06-28 23:58:08 -07:00 committed by GitHub
parent ca79869078
commit a69c2a2fb3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 137 additions and 46 deletions

View File

@ -605,6 +605,12 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationActio
// replicateObject replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer) {
auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{
Trigger: ReplicationIncomingActivity,
APIName: "s3:ReplicateObject",
VersionID: ri.VersionID,
Status: ri.ReplicationStatus.String(),
})
objInfo := ri.ObjectInfo
bucket := objInfo.Bucket
object := objInfo.Name
@ -631,7 +637,8 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
})
return
}
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, writeLock, ObjectOptions{
var closeOnDefer bool
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
VersionID: objInfo.VersionID,
})
if err != nil {
@ -644,8 +651,12 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
logger.LogIf(ctx, fmt.Errorf("Unable to update replicate for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err))
return
}
defer gr.Close() // hold write lock for entire transaction
defer func() {
if closeOnDefer {
gr.Close()
}
}()
closeOnDefer = true
objInfo = gr.ObjInfo
size, err := objInfo.GetActualSize()
@ -686,28 +697,29 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
if objInfo.ReplicationStatus == replication.Pending || objInfo.ReplicationStatus == replication.Failed {
// if metadata is not updated for some reason after replication, such as 503 encountered while updating metadata - make sure
// to set ReplicationStatus as Completed.Note that replication Stats would have been updated despite metadata update failure.
z, ok := objectAPI.(*erasureServerPools)
if !ok {
return
gr.Close()
closeOnDefer = false
popts := ObjectOptions{
MTime: objInfo.ModTime,
VersionID: objInfo.VersionID,
UserDefined: make(map[string]string, len(objInfo.UserDefined)),
}
// This lower level implementation is necessary to avoid write locks from CopyObject.
poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size)
if err != nil {
for k, v := range objInfo.UserDefined {
popts.UserDefined[k] = v
}
popts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Completed.String()
if objInfo.UserTags != "" {
popts.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
}
if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
} else {
fi := FileInfo{}
fi.VersionID = objInfo.VersionID
fi.Metadata = make(map[string]string, len(objInfo.UserDefined))
for k, v := range objInfo.UserDefined {
fi.Metadata[k] = v
}
fi.Metadata[xhttp.AmzBucketReplicationStatus] = replication.Completed.String()
if objInfo.UserTags != "" {
fi.Metadata[xhttp.AmzObjectTagging] = objInfo.UserTags
}
if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, fi); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
}
auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{
Trigger: ReplicationIncomingActivity,
APIName: "s3:ReplicateObject",
VersionID: ri.VersionID,
Status: ri.ReplicationStatus.String(),
})
}
}
return
@ -761,6 +773,8 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
}
}
gr.Close()
closeOnDefer = false
prevReplStatus := objInfo.ReplicationStatus
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()
@ -778,28 +792,32 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
eventName = event.ObjectReplicationFailed
}
z, ok := objectAPI.(*erasureServerPools)
if !ok {
return
}
// Leave metadata in `PENDING` state if inline replication fails to save iops
if ri.OpType == replication.HealReplicationType ||
replicationStatus == replication.Completed {
// This lower level implementation is necessary to avoid write locks from CopyObject.
poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size)
if err != nil {
popts := ObjectOptions{
MTime: objInfo.ModTime,
VersionID: objInfo.VersionID,
UserDefined: make(map[string]string, len(objInfo.UserDefined)),
}
for k, v := range objInfo.UserDefined {
popts.UserDefined[k] = v
}
popts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Completed.String()
if objInfo.UserTags != "" {
popts.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
}
if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
} else {
fi := FileInfo{}
fi.VersionID = objInfo.VersionID
fi.Metadata = make(map[string]string, len(objInfo.UserDefined))
for k, v := range objInfo.UserDefined {
fi.Metadata[k] = v
}
if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, fi); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
}
auditLogInternal(context.Background(), objInfo.Bucket, objInfo.Name, AuditLogOptions{
Trigger: ReplicationIncomingActivity,
APIName: "s3:ReplicateObject",
VersionID: objInfo.VersionID,
Status: replicationStatus.String(),
})
}
opType := replication.MetadataReplicationType
if rtype == replicateAll {
opType = replication.ObjectReplicationType
@ -852,6 +870,23 @@ type DeletedObjectReplicationInfo struct {
ResetID string
}
const (
// ReplicationQueuedActivity - replication being queued activity trail
ReplicationQueuedActivity = "replication:queue"
// ReplicationExistingActivity - activity trail for existing objects replication
ReplicationExistingActivity = "replication:existing"
// ReplicationMRFActivity - activity trail for replication from Most Recent Failures (MRF) queue
ReplicationMRFActivity = "replication:mrf"
// ReplicationIncomingActivity - activity trail indicating replication started [could be from incoming/existing/heal activity]
ReplicationIncomingActivity = "replication:incoming"
// ReplicationHealActivity - activity trail for healing of failed/pending replications
ReplicationHealActivity = "replication:heal"
// ReplicationDeleteActivity - activity trail for delete replication
ReplicationDeleteActivity = "replication:delete"
// ReplicationExistingDeleteActivity - activity trail for delete replication triggered for existing delete markers
ReplicationExistingDeleteActivity = "replication:delete:existing"
)
var (
globalReplicationPool *ReplicationPool
globalReplicationStats *ReplicationStats
@ -1002,6 +1037,12 @@ func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) {
close(p.existingReplicaCh)
})
case p.mrfReplicaCh <- ri:
auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{
Trigger: ReplicationMRFActivity,
APIName: "s3:ReplicateObject",
VersionID: ri.VersionID,
Status: ri.ReplicationStatus.String(),
})
default:
}
}
@ -1011,9 +1052,14 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
return
}
var ch chan ReplicateObjectInfo
trigger := ReplicationQueuedActivity
switch ri.OpType {
case replication.ExistingObjectReplicationType:
ch = p.existingReplicaCh
trigger = ReplicationExistingActivity
case replication.HealReplicationType:
ch = p.replicaCh
trigger = ReplicationHealActivity
default:
ch = p.replicaCh
}
@ -1025,6 +1071,12 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
close(p.existingReplicaCh)
})
case ch <- ri:
auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{
Trigger: trigger,
APIName: "s3:ReplicateObject",
VersionID: ri.VersionID,
Status: string(ri.ReplicationStatus),
})
default:
}
}
@ -1033,10 +1085,15 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
if p == nil {
return
}
trigger := ReplicationDeleteActivity
var ch chan DeletedObjectReplicationInfo
switch doi.OpType {
case replication.ExistingObjectReplicationType:
ch = p.existingReplicaDeleteCh
trigger = ReplicationExistingDeleteActivity
case replication.HealReplicationType:
ch = p.replicaDeleteCh
trigger = ReplicationHealActivity
default:
ch = p.replicaDeleteCh
}
@ -1048,6 +1105,16 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
close(p.existingReplicaDeleteCh)
})
case ch <- doi:
replStatus := doi.DeleteMarkerReplicationStatus
if doi.VersionPurgeStatus != "" {
replStatus = string(doi.VersionPurgeStatus)
}
auditLogInternal(context.Background(), doi.Bucket, doi.ObjectName, AuditLogOptions{
Trigger: trigger,
APIName: "s3:ReplicateDelete",
VersionID: doi.VersionID,
Status: replStatus,
})
default:
}
}

View File

@ -40,7 +40,6 @@ import (
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/logger/message/audit"
"github.com/minio/pkg/console"
)
@ -1342,12 +1341,12 @@ func (d *dynamicSleeper) Update(factor float64, maxWait time.Duration) error {
return nil
}
// ILMExpiryActivity - activity trail for ILM expiry
const ILMExpiryActivity = "ilm:expiry"
func auditLogLifecycle(ctx context.Context, bucket, object string) {
entry := audit.NewEntry(globalDeploymentID)
entry.Trigger = "internal-scanner"
entry.API.Name = "DeleteObject"
entry.API.Bucket = bucket
entry.API.Object = object
ctx = logger.SetAuditEntry(ctx, &entry)
logger.AuditLog(ctx, nil, nil, nil)
auditLogInternal(ctx, bucket, object, AuditLogOptions{
Trigger: ILMExpiryActivity,
APIName: "s3:ExpireObject",
})
}

View File

@ -48,6 +48,7 @@ import (
"github.com/minio/minio/internal/handlers"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/logger/message/audit"
"github.com/minio/minio/internal/rest"
"github.com/minio/pkg/certs"
)
@ -940,3 +941,27 @@ func totalNodeCount() uint64 {
}
return totalNodesCount
}
// AuditLogOptions takes options for audit logging subsystem activity
type AuditLogOptions struct {
Trigger string
APIName string
Status string
VersionID string
}
// sends audit logs for internal subsystem activity
func auditLogInternal(ctx context.Context, bucket, object string, opts AuditLogOptions) {
entry := audit.NewEntry(globalDeploymentID)
entry.Trigger = opts.Trigger
entry.API.Name = opts.APIName
entry.API.Bucket = bucket
entry.API.Object = object
if opts.VersionID != "" {
entry.ReqQuery = make(map[string]string)
entry.ReqQuery[xhttp.VersionID] = opts.VersionID
}
entry.API.Status = opts.Status
ctx = logger.SetAuditEntry(ctx, &entry)
logger.AuditLog(ctx, nil, nil, nil)
}