From 44a3b58e52cde6db89fdb99bcc0ea3713c5ad85e Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Wed, 4 May 2022 08:45:27 +0100 Subject: [PATCH] Add audit log for decommissioning (#14858) --- cmd/erasure-multipart.go | 14 ++++++++++++ cmd/erasure-object.go | 14 ++++++++++++ cmd/erasure-server-pool-decom.go | 31 ++++++++++++++++++++++++-- cmd/erasure-sets.go | 21 ++--------------- cmd/utils.go | 7 ++++++ internal/logger/message/audit/entry.go | 2 ++ 6 files changed, 68 insertions(+), 21 deletions(-) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index c3899d51f..43ce216db 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -190,6 +190,8 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto // towards simplification of multipart APIs. // The resulting ListMultipartsInfo structure is unmarshalled directly as XML. func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) { + auditObjectErasureSet(ctx, object, &er) + result.MaxUploads = maxUploads result.KeyMarker = keyMarker result.Prefix = object @@ -371,6 +373,8 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, // // Implements S3 compatible initiate multipart API. func (er erasureObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (string, error) { + auditObjectErasureSet(ctx, object, &er) + // No metadata is set, allocate a new one. if opts.UserDefined == nil { opts.UserDefined = make(map[string]string) @@ -446,6 +450,8 @@ func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, ds // // Implements S3 compatible Upload Part API. func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) { + auditObjectErasureSet(ctx, object, &er) + // Write lock for this part ID. // Held throughout the operation. partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID))) @@ -683,6 +689,8 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // - encrypted // - compressed func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) { + auditObjectErasureSet(ctx, object, &er) + result := MultipartInfo{ Bucket: bucket, Object: object, @@ -739,6 +747,8 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u // ListPartsInfo structure is marshaled directly into XML and // replied back to the client. func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) { + auditObjectErasureSet(ctx, object, &er) + uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) if err != nil { @@ -832,6 +842,8 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up // // Implements S3 compatible Complete multipart API. func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) { + auditObjectErasureSet(ctx, object, &er) + // Hold read-locks to verify uploaded parts, also disallows // parallel part uploads as well. uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) @@ -1028,6 +1040,8 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // would be removed from the system, rollback is not possible on this // operation. func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) { + auditObjectErasureSet(ctx, object, &er) + lk := er.NewNSLock(bucket, pathJoin(object, uploadID)) lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 738518c87..77704dee7 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -63,6 +63,8 @@ func countOnlineDisks(onlineDisks []StorageAPI) (online int) { // if source object and destination object are same we only // update metadata. func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, err error) { + auditObjectErasureSet(ctx, dstObject, &er) + // This call shouldn't be used for anything other than metadata updates or adding self referential versions. if !srcInfo.metadataOnly { return oi, NotImplemented{} @@ -177,6 +179,8 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d // GetObjectNInfo - returns object info and an object // Read(Closer). When err != nil, the returned reader is always nil. func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { + auditObjectErasureSet(ctx, object, &er) + var unlockOnDefer bool nsUnlocker := func() {} defer func() { @@ -413,6 +417,8 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) { + auditObjectErasureSet(ctx, object, &er) + if !opts.NoLock { // Lock the object before reading. lk := er.NewNSLock(bucket, object) @@ -832,6 +838,8 @@ func (er erasureObjects) PutObject(ctx context.Context, bucket string, object st // putObject wrapper for erasureObjects PutObject func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { + auditObjectErasureSet(ctx, object, &er) + data := r.Reader // No metadata is set, allocate a new one. @@ -1148,6 +1156,10 @@ func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object // into smaller bulks if some object names are found to be duplicated in the delete list, splitting // into smaller bulks will avoid holding twice the write lock of the duplicated object names. func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) { + for _, obj := range objects { + auditObjectErasureSet(ctx, obj.ObjectV.ObjectName, &er) + } + errs := make([]error, len(objects)) dobjects := make([]DeletedObject, len(objects)) writeQuorums := make([]int, len(objects)) @@ -1328,6 +1340,8 @@ func (er erasureObjects) deletePrefix(ctx context.Context, bucket, prefix string // any error as it is not necessary for the handler to reply back a // response to the client request. func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { + auditObjectErasureSet(ctx, object, &er) + if opts.DeletePrefix { return ObjectInfo{}, toObjectErr(er.deletePrefix(ctx, bucket, object), bucket, object) } diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 0889ebfa6..11c738d1e 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -541,8 +541,13 @@ func (z *erasureServerPools) Init(ctx context.Context) error { } func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) { - defer gr.Close() objInfo := gr.ObjInfo + + defer func() { + gr.Close() + auditLogDecom(ctx, "DecomCopyData", objInfo.Bucket, objInfo.Name, objInfo.VersionID, err) + }() + if objInfo.isMultipart() { uploadID, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name, ObjectOptions{ VersionID: objInfo.VersionID, @@ -603,6 +608,8 @@ func (v versionsSorter) reverse() { } func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bName string) error { + ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{}) + var wg sync.WaitGroup wStr := env.Get("_MINIO_DECOMMISSION_WORKERS", strconv.Itoa(len(pool.sets))) workerSize, err := strconv.Atoi(wStr) @@ -713,13 +720,17 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool // if all versions were decommissioned, then we can delete the object versions. if decommissionedCount == len(fivs.Versions) { - set.DeleteObject(ctx, + _, err := set.DeleteObject(ctx, bName, entry.name, ObjectOptions{ DeletePrefix: true, // use prefix delete to delete all versions at once. }, ) + auditLogDecom(ctx, "DecomDeleteObject", bName, entry.name, "", err) + if err != nil { + logger.LogIf(ctx, err) + } } z.poolMetaMutex.Lock() z.poolMeta.TrackCurrentBucketObject(idx, bName, entry.name) @@ -804,6 +815,9 @@ func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx in dctx, z.decommissionCancelers[idx] = context.WithCancel(GlobalContext) z.poolMetaMutex.Unlock() + // Generate an empty request info so it can be directly modified later by audit + dctx = logger.SetReqInfo(dctx, &logger.ReqInfo{}) + if err := z.decommissionInBackground(dctx, idx); err != nil { logger.LogIf(GlobalContext, err) logger.LogIf(GlobalContext, z.DecommissionFailed(dctx, idx)) @@ -1075,3 +1089,16 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (er globalNotificationSys.ReloadPoolMeta(ctx) return nil } + +func auditLogDecom(ctx context.Context, apiName, bucket, object, versionID string, err error) { + errStr := "" + if err != nil { + errStr = err.Error() + } + auditLogInternal(ctx, bucket, object, AuditLogOptions{ + Trigger: "decommissioning", + APIName: apiName, + VersionID: versionID, + Error: errStr, + }) +} diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index e926268ba..61964f1b4 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -572,6 +572,7 @@ func (a *auditObjectErasureMap) MarshalJSON() ([]byte, error) { return json.Marshal(mapCopy) } +// Add erasure set information to the current context func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjects) { if len(logger.AuditTargets()) == 0 { return @@ -933,21 +934,18 @@ func (s *erasureSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, er // GetObjectNInfo - returns object info and locked object ReadCloser func (s *erasureSets) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set) return set.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts) } // PutObject - writes an object to hashedSet based on the object name. func (s *erasureSets) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set) return set.PutObject(ctx, bucket, object, data, opts) } // GetObjectInfo - reads object metadata from the hashedSet based on the object name. func (s *erasureSets) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set) return set.GetObjectInfo(ctx, bucket, object, opts) } @@ -967,13 +965,11 @@ func (s *erasureSets) deletePrefix(ctx context.Context, bucket string, prefix st // DeleteObject - deletes an object from the hashedSet based on the object name. func (s *erasureSets) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { - set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set) - if opts.DeletePrefix { err := s.deletePrefix(ctx, bucket, object) return ObjectInfo{}, err } + set := s.getHashedSet(object) return set.DeleteObject(ctx, bucket, object, opts) } @@ -1030,9 +1026,6 @@ func (s *erasureSets) DeleteObjects(ctx context.Context, bucket string, objects for i, obj := range group { delErrs[obj.origIndex] = errs[i] delObjects[obj.origIndex] = dobjects[i] - if errs[i] == nil { - auditObjectErasureSet(ctx, obj.object.ObjectName, set) - } } }(s.sets[setIdx], objsGroup) } @@ -1046,8 +1039,6 @@ func (s *erasureSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstB srcSet := s.getHashedSet(srcObject) dstSet := s.getHashedSet(dstObject) - auditObjectErasureSet(ctx, dstObject, dstSet) - cpSrcDstSame := srcSet == dstSet // Check if this request is only metadata update. if cpSrcDstSame && srcInfo.metadataOnly { @@ -1086,14 +1077,12 @@ func (s *erasureSets) ListMultipartUploads(ctx context.Context, bucket, prefix, // In list multipart uploads we are going to treat input prefix as the object, // this means that we are not supporting directory navigation. set := s.getHashedSet(prefix) - auditObjectErasureSet(ctx, prefix, set) return set.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) } // Initiate a new multipart upload on a hashedSet based on object name. func (s *erasureSets) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set) return set.NewMultipartUpload(ctx, bucket, object, opts) } @@ -1102,42 +1091,36 @@ func (s *erasureSets) CopyObjectPart(ctx context.Context, srcBucket, srcObject, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions, ) (partInfo PartInfo, err error) { destSet := s.getHashedSet(destObject) - auditObjectErasureSet(ctx, destObject, destSet) return destSet.PutObjectPart(ctx, destBucket, destObject, uploadID, partID, NewPutObjReader(srcInfo.Reader), dstOpts) } // PutObjectPart - writes part of an object to hashedSet based on the object name. func (s *erasureSets) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set) return set.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) } // GetMultipartInfo - return multipart metadata info uploaded at hashedSet. func (s *erasureSets) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (result MultipartInfo, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set) return set.GetMultipartInfo(ctx, bucket, object, uploadID, opts) } // ListObjectParts - lists all uploaded parts to an object in hashedSet. func (s *erasureSets) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set) return set.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) } // Aborts an in-progress multipart operation on hashedSet based on the object name. func (s *erasureSets) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set) return set.AbortMultipartUpload(ctx, bucket, object, uploadID, opts) } // CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name. func (s *erasureSets) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set) return set.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) } diff --git a/cmd/utils.go b/cmd/utils.go index cdabf15c5..02a1e3b46 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -1019,12 +1019,14 @@ type AuditLogOptions struct { APIName string Status string VersionID string + Error string } // sends audit logs for internal subsystem activity func auditLogInternal(ctx context.Context, bucket, object string, opts AuditLogOptions) { entry := audit.NewEntry(globalDeploymentID) entry.Trigger = opts.Trigger + entry.Error = opts.Error entry.API.Name = opts.APIName entry.API.Bucket = bucket entry.API.Object = object @@ -1033,6 +1035,11 @@ func auditLogInternal(ctx context.Context, bucket, object string, opts AuditLogO entry.ReqQuery[xhttp.VersionID] = opts.VersionID } entry.API.Status = opts.Status + // Merge tag information if found - this is currently needed for tags + // set during decommissioning. + if reqInfo := logger.GetReqInfo(ctx); reqInfo != nil { + entry.Tags = reqInfo.GetTagsMap() + } ctx = logger.SetAuditEntry(ctx, &entry) logger.AuditLog(ctx, nil, nil, nil) } diff --git a/internal/logger/message/audit/entry.go b/internal/logger/message/audit/entry.go index b2286800b..ef02d1dce 100644 --- a/internal/logger/message/audit/entry.go +++ b/internal/logger/message/audit/entry.go @@ -61,6 +61,8 @@ type Entry struct { ReqHeader map[string]string `json:"requestHeader,omitempty"` RespHeader map[string]string `json:"responseHeader,omitempty"` Tags map[string]interface{} `json:"tags,omitempty"` + + Error string `json:"error,omitempty"` } // NewEntry - constructs an audit entry object with some fields filled