mirror of
https://github.com/minio/minio.git
synced 2025-04-20 02:27:50 -04:00
Add audit log for decommissioning (#14858)
This commit is contained in:
parent
0a256053ee
commit
44a3b58e52
@ -190,6 +190,8 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
|||||||
// towards simplification of multipart APIs.
|
// towards simplification of multipart APIs.
|
||||||
// The resulting ListMultipartsInfo structure is unmarshalled directly as XML.
|
// The resulting ListMultipartsInfo structure is unmarshalled directly as XML.
|
||||||
func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
|
func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
|
||||||
result.MaxUploads = maxUploads
|
result.MaxUploads = maxUploads
|
||||||
result.KeyMarker = keyMarker
|
result.KeyMarker = keyMarker
|
||||||
result.Prefix = object
|
result.Prefix = object
|
||||||
@ -371,6 +373,8 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
|
|||||||
//
|
//
|
||||||
// Implements S3 compatible initiate multipart API.
|
// Implements S3 compatible initiate multipart API.
|
||||||
func (er erasureObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (string, error) {
|
func (er erasureObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (string, error) {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
|
||||||
// No metadata is set, allocate a new one.
|
// No metadata is set, allocate a new one.
|
||||||
if opts.UserDefined == nil {
|
if opts.UserDefined == nil {
|
||||||
opts.UserDefined = make(map[string]string)
|
opts.UserDefined = make(map[string]string)
|
||||||
@ -446,6 +450,8 @@ func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, ds
|
|||||||
//
|
//
|
||||||
// Implements S3 compatible Upload Part API.
|
// Implements S3 compatible Upload Part API.
|
||||||
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
|
||||||
// Write lock for this part ID.
|
// Write lock for this part ID.
|
||||||
// Held throughout the operation.
|
// Held throughout the operation.
|
||||||
partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID)))
|
partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID)))
|
||||||
@ -683,6 +689,8 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
// - encrypted
|
// - encrypted
|
||||||
// - compressed
|
// - compressed
|
||||||
func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
|
func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
|
||||||
result := MultipartInfo{
|
result := MultipartInfo{
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
Object: object,
|
Object: object,
|
||||||
@ -739,6 +747,8 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
|
|||||||
// ListPartsInfo structure is marshaled directly into XML and
|
// ListPartsInfo structure is marshaled directly into XML and
|
||||||
// replied back to the client.
|
// replied back to the client.
|
||||||
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
|
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
|
||||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||||
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -832,6 +842,8 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
|||||||
//
|
//
|
||||||
// Implements S3 compatible Complete multipart API.
|
// Implements S3 compatible Complete multipart API.
|
||||||
func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
|
func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
|
||||||
// Hold read-locks to verify uploaded parts, also disallows
|
// Hold read-locks to verify uploaded parts, also disallows
|
||||||
// parallel part uploads as well.
|
// parallel part uploads as well.
|
||||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||||
@ -1028,6 +1040,8 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
// would be removed from the system, rollback is not possible on this
|
// would be removed from the system, rollback is not possible on this
|
||||||
// operation.
|
// operation.
|
||||||
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) {
|
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
|
||||||
lk := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
lk := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||||
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -63,6 +63,8 @@ func countOnlineDisks(onlineDisks []StorageAPI) (online int) {
|
|||||||
// if source object and destination object are same we only
|
// if source object and destination object are same we only
|
||||||
// update metadata.
|
// update metadata.
|
||||||
func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, err error) {
|
func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, err error) {
|
||||||
|
auditObjectErasureSet(ctx, dstObject, &er)
|
||||||
|
|
||||||
// This call shouldn't be used for anything other than metadata updates or adding self referential versions.
|
// This call shouldn't be used for anything other than metadata updates or adding self referential versions.
|
||||||
if !srcInfo.metadataOnly {
|
if !srcInfo.metadataOnly {
|
||||||
return oi, NotImplemented{}
|
return oi, NotImplemented{}
|
||||||
@ -177,6 +179,8 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
|
|||||||
// GetObjectNInfo - returns object info and an object
|
// GetObjectNInfo - returns object info and an object
|
||||||
// Read(Closer). When err != nil, the returned reader is always nil.
|
// Read(Closer). When err != nil, the returned reader is always nil.
|
||||||
func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
|
||||||
var unlockOnDefer bool
|
var unlockOnDefer bool
|
||||||
nsUnlocker := func() {}
|
nsUnlocker := func() {}
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -413,6 +417,8 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
|
|||||||
|
|
||||||
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
||||||
func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) {
|
func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
|
||||||
if !opts.NoLock {
|
if !opts.NoLock {
|
||||||
// Lock the object before reading.
|
// Lock the object before reading.
|
||||||
lk := er.NewNSLock(bucket, object)
|
lk := er.NewNSLock(bucket, object)
|
||||||
@ -832,6 +838,8 @@ func (er erasureObjects) PutObject(ctx context.Context, bucket string, object st
|
|||||||
|
|
||||||
// putObject wrapper for erasureObjects PutObject
|
// putObject wrapper for erasureObjects PutObject
|
||||||
func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
|
||||||
data := r.Reader
|
data := r.Reader
|
||||||
|
|
||||||
// No metadata is set, allocate a new one.
|
// No metadata is set, allocate a new one.
|
||||||
@ -1148,6 +1156,10 @@ func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object
|
|||||||
// into smaller bulks if some object names are found to be duplicated in the delete list, splitting
|
// into smaller bulks if some object names are found to be duplicated in the delete list, splitting
|
||||||
// into smaller bulks will avoid holding twice the write lock of the duplicated object names.
|
// into smaller bulks will avoid holding twice the write lock of the duplicated object names.
|
||||||
func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) {
|
func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) {
|
||||||
|
for _, obj := range objects {
|
||||||
|
auditObjectErasureSet(ctx, obj.ObjectV.ObjectName, &er)
|
||||||
|
}
|
||||||
|
|
||||||
errs := make([]error, len(objects))
|
errs := make([]error, len(objects))
|
||||||
dobjects := make([]DeletedObject, len(objects))
|
dobjects := make([]DeletedObject, len(objects))
|
||||||
writeQuorums := make([]int, len(objects))
|
writeQuorums := make([]int, len(objects))
|
||||||
@ -1328,6 +1340,8 @@ func (er erasureObjects) deletePrefix(ctx context.Context, bucket, prefix string
|
|||||||
// any error as it is not necessary for the handler to reply back a
|
// any error as it is not necessary for the handler to reply back a
|
||||||
// response to the client request.
|
// response to the client request.
|
||||||
func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
|
||||||
if opts.DeletePrefix {
|
if opts.DeletePrefix {
|
||||||
return ObjectInfo{}, toObjectErr(er.deletePrefix(ctx, bucket, object), bucket, object)
|
return ObjectInfo{}, toObjectErr(er.deletePrefix(ctx, bucket, object), bucket, object)
|
||||||
}
|
}
|
||||||
|
@ -541,8 +541,13 @@ func (z *erasureServerPools) Init(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) {
|
func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) {
|
||||||
defer gr.Close()
|
|
||||||
objInfo := gr.ObjInfo
|
objInfo := gr.ObjInfo
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
gr.Close()
|
||||||
|
auditLogDecom(ctx, "DecomCopyData", objInfo.Bucket, objInfo.Name, objInfo.VersionID, err)
|
||||||
|
}()
|
||||||
|
|
||||||
if objInfo.isMultipart() {
|
if objInfo.isMultipart() {
|
||||||
uploadID, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name, ObjectOptions{
|
uploadID, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name, ObjectOptions{
|
||||||
VersionID: objInfo.VersionID,
|
VersionID: objInfo.VersionID,
|
||||||
@ -603,6 +608,8 @@ func (v versionsSorter) reverse() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bName string) error {
|
func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bName string) error {
|
||||||
|
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wStr := env.Get("_MINIO_DECOMMISSION_WORKERS", strconv.Itoa(len(pool.sets)))
|
wStr := env.Get("_MINIO_DECOMMISSION_WORKERS", strconv.Itoa(len(pool.sets)))
|
||||||
workerSize, err := strconv.Atoi(wStr)
|
workerSize, err := strconv.Atoi(wStr)
|
||||||
@ -713,13 +720,17 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
|||||||
|
|
||||||
// if all versions were decommissioned, then we can delete the object versions.
|
// if all versions were decommissioned, then we can delete the object versions.
|
||||||
if decommissionedCount == len(fivs.Versions) {
|
if decommissionedCount == len(fivs.Versions) {
|
||||||
set.DeleteObject(ctx,
|
_, err := set.DeleteObject(ctx,
|
||||||
bName,
|
bName,
|
||||||
entry.name,
|
entry.name,
|
||||||
ObjectOptions{
|
ObjectOptions{
|
||||||
DeletePrefix: true, // use prefix delete to delete all versions at once.
|
DeletePrefix: true, // use prefix delete to delete all versions at once.
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
auditLogDecom(ctx, "DecomDeleteObject", bName, entry.name, "", err)
|
||||||
|
if err != nil {
|
||||||
|
logger.LogIf(ctx, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
z.poolMetaMutex.Lock()
|
z.poolMetaMutex.Lock()
|
||||||
z.poolMeta.TrackCurrentBucketObject(idx, bName, entry.name)
|
z.poolMeta.TrackCurrentBucketObject(idx, bName, entry.name)
|
||||||
@ -804,6 +815,9 @@ func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx in
|
|||||||
dctx, z.decommissionCancelers[idx] = context.WithCancel(GlobalContext)
|
dctx, z.decommissionCancelers[idx] = context.WithCancel(GlobalContext)
|
||||||
z.poolMetaMutex.Unlock()
|
z.poolMetaMutex.Unlock()
|
||||||
|
|
||||||
|
// Generate an empty request info so it can be directly modified later by audit
|
||||||
|
dctx = logger.SetReqInfo(dctx, &logger.ReqInfo{})
|
||||||
|
|
||||||
if err := z.decommissionInBackground(dctx, idx); err != nil {
|
if err := z.decommissionInBackground(dctx, idx); err != nil {
|
||||||
logger.LogIf(GlobalContext, err)
|
logger.LogIf(GlobalContext, err)
|
||||||
logger.LogIf(GlobalContext, z.DecommissionFailed(dctx, idx))
|
logger.LogIf(GlobalContext, z.DecommissionFailed(dctx, idx))
|
||||||
@ -1075,3 +1089,16 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (er
|
|||||||
globalNotificationSys.ReloadPoolMeta(ctx)
|
globalNotificationSys.ReloadPoolMeta(ctx)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func auditLogDecom(ctx context.Context, apiName, bucket, object, versionID string, err error) {
|
||||||
|
errStr := ""
|
||||||
|
if err != nil {
|
||||||
|
errStr = err.Error()
|
||||||
|
}
|
||||||
|
auditLogInternal(ctx, bucket, object, AuditLogOptions{
|
||||||
|
Trigger: "decommissioning",
|
||||||
|
APIName: apiName,
|
||||||
|
VersionID: versionID,
|
||||||
|
Error: errStr,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -572,6 +572,7 @@ func (a *auditObjectErasureMap) MarshalJSON() ([]byte, error) {
|
|||||||
return json.Marshal(mapCopy)
|
return json.Marshal(mapCopy)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add erasure set information to the current context
|
||||||
func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjects) {
|
func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjects) {
|
||||||
if len(logger.AuditTargets()) == 0 {
|
if len(logger.AuditTargets()) == 0 {
|
||||||
return
|
return
|
||||||
@ -933,21 +934,18 @@ func (s *erasureSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, er
|
|||||||
// GetObjectNInfo - returns object info and locked object ReadCloser
|
// GetObjectNInfo - returns object info and locked object ReadCloser
|
||||||
func (s *erasureSets) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
func (s *erasureSets) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||||
set := s.getHashedSet(object)
|
set := s.getHashedSet(object)
|
||||||
auditObjectErasureSet(ctx, object, set)
|
|
||||||
return set.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
|
return set.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutObject - writes an object to hashedSet based on the object name.
|
// PutObject - writes an object to hashedSet based on the object name.
|
||||||
func (s *erasureSets) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (s *erasureSets) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
set := s.getHashedSet(object)
|
set := s.getHashedSet(object)
|
||||||
auditObjectErasureSet(ctx, object, set)
|
|
||||||
return set.PutObject(ctx, bucket, object, data, opts)
|
return set.PutObject(ctx, bucket, object, data, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObjectInfo - reads object metadata from the hashedSet based on the object name.
|
// GetObjectInfo - reads object metadata from the hashedSet based on the object name.
|
||||||
func (s *erasureSets) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (s *erasureSets) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
set := s.getHashedSet(object)
|
set := s.getHashedSet(object)
|
||||||
auditObjectErasureSet(ctx, object, set)
|
|
||||||
return set.GetObjectInfo(ctx, bucket, object, opts)
|
return set.GetObjectInfo(ctx, bucket, object, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -967,13 +965,11 @@ func (s *erasureSets) deletePrefix(ctx context.Context, bucket string, prefix st
|
|||||||
|
|
||||||
// DeleteObject - deletes an object from the hashedSet based on the object name.
|
// DeleteObject - deletes an object from the hashedSet based on the object name.
|
||||||
func (s *erasureSets) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (s *erasureSets) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
set := s.getHashedSet(object)
|
|
||||||
auditObjectErasureSet(ctx, object, set)
|
|
||||||
|
|
||||||
if opts.DeletePrefix {
|
if opts.DeletePrefix {
|
||||||
err := s.deletePrefix(ctx, bucket, object)
|
err := s.deletePrefix(ctx, bucket, object)
|
||||||
return ObjectInfo{}, err
|
return ObjectInfo{}, err
|
||||||
}
|
}
|
||||||
|
set := s.getHashedSet(object)
|
||||||
return set.DeleteObject(ctx, bucket, object, opts)
|
return set.DeleteObject(ctx, bucket, object, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1030,9 +1026,6 @@ func (s *erasureSets) DeleteObjects(ctx context.Context, bucket string, objects
|
|||||||
for i, obj := range group {
|
for i, obj := range group {
|
||||||
delErrs[obj.origIndex] = errs[i]
|
delErrs[obj.origIndex] = errs[i]
|
||||||
delObjects[obj.origIndex] = dobjects[i]
|
delObjects[obj.origIndex] = dobjects[i]
|
||||||
if errs[i] == nil {
|
|
||||||
auditObjectErasureSet(ctx, obj.object.ObjectName, set)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}(s.sets[setIdx], objsGroup)
|
}(s.sets[setIdx], objsGroup)
|
||||||
}
|
}
|
||||||
@ -1046,8 +1039,6 @@ func (s *erasureSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstB
|
|||||||
srcSet := s.getHashedSet(srcObject)
|
srcSet := s.getHashedSet(srcObject)
|
||||||
dstSet := s.getHashedSet(dstObject)
|
dstSet := s.getHashedSet(dstObject)
|
||||||
|
|
||||||
auditObjectErasureSet(ctx, dstObject, dstSet)
|
|
||||||
|
|
||||||
cpSrcDstSame := srcSet == dstSet
|
cpSrcDstSame := srcSet == dstSet
|
||||||
// Check if this request is only metadata update.
|
// Check if this request is only metadata update.
|
||||||
if cpSrcDstSame && srcInfo.metadataOnly {
|
if cpSrcDstSame && srcInfo.metadataOnly {
|
||||||
@ -1086,14 +1077,12 @@ func (s *erasureSets) ListMultipartUploads(ctx context.Context, bucket, prefix,
|
|||||||
// In list multipart uploads we are going to treat input prefix as the object,
|
// In list multipart uploads we are going to treat input prefix as the object,
|
||||||
// this means that we are not supporting directory navigation.
|
// this means that we are not supporting directory navigation.
|
||||||
set := s.getHashedSet(prefix)
|
set := s.getHashedSet(prefix)
|
||||||
auditObjectErasureSet(ctx, prefix, set)
|
|
||||||
return set.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
|
return set.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initiate a new multipart upload on a hashedSet based on object name.
|
// Initiate a new multipart upload on a hashedSet based on object name.
|
||||||
func (s *erasureSets) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error) {
|
func (s *erasureSets) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error) {
|
||||||
set := s.getHashedSet(object)
|
set := s.getHashedSet(object)
|
||||||
auditObjectErasureSet(ctx, object, set)
|
|
||||||
return set.NewMultipartUpload(ctx, bucket, object, opts)
|
return set.NewMultipartUpload(ctx, bucket, object, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1102,42 +1091,36 @@ func (s *erasureSets) CopyObjectPart(ctx context.Context, srcBucket, srcObject,
|
|||||||
startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions,
|
startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions,
|
||||||
) (partInfo PartInfo, err error) {
|
) (partInfo PartInfo, err error) {
|
||||||
destSet := s.getHashedSet(destObject)
|
destSet := s.getHashedSet(destObject)
|
||||||
auditObjectErasureSet(ctx, destObject, destSet)
|
|
||||||
return destSet.PutObjectPart(ctx, destBucket, destObject, uploadID, partID, NewPutObjReader(srcInfo.Reader), dstOpts)
|
return destSet.PutObjectPart(ctx, destBucket, destObject, uploadID, partID, NewPutObjReader(srcInfo.Reader), dstOpts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutObjectPart - writes part of an object to hashedSet based on the object name.
|
// PutObjectPart - writes part of an object to hashedSet based on the object name.
|
||||||
func (s *erasureSets) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error) {
|
func (s *erasureSets) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error) {
|
||||||
set := s.getHashedSet(object)
|
set := s.getHashedSet(object)
|
||||||
auditObjectErasureSet(ctx, object, set)
|
|
||||||
return set.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
|
return set.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMultipartInfo - return multipart metadata info uploaded at hashedSet.
|
// GetMultipartInfo - return multipart metadata info uploaded at hashedSet.
|
||||||
func (s *erasureSets) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (result MultipartInfo, err error) {
|
func (s *erasureSets) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (result MultipartInfo, err error) {
|
||||||
set := s.getHashedSet(object)
|
set := s.getHashedSet(object)
|
||||||
auditObjectErasureSet(ctx, object, set)
|
|
||||||
return set.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
return set.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListObjectParts - lists all uploaded parts to an object in hashedSet.
|
// ListObjectParts - lists all uploaded parts to an object in hashedSet.
|
||||||
func (s *erasureSets) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
|
func (s *erasureSets) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
|
||||||
set := s.getHashedSet(object)
|
set := s.getHashedSet(object)
|
||||||
auditObjectErasureSet(ctx, object, set)
|
|
||||||
return set.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
|
return set.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Aborts an in-progress multipart operation on hashedSet based on the object name.
|
// Aborts an in-progress multipart operation on hashedSet based on the object name.
|
||||||
func (s *erasureSets) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
|
func (s *erasureSets) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
|
||||||
set := s.getHashedSet(object)
|
set := s.getHashedSet(object)
|
||||||
auditObjectErasureSet(ctx, object, set)
|
|
||||||
return set.AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
|
return set.AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name.
|
// CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name.
|
||||||
func (s *erasureSets) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (s *erasureSets) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
set := s.getHashedSet(object)
|
set := s.getHashedSet(object)
|
||||||
auditObjectErasureSet(ctx, object, set)
|
|
||||||
return set.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
|
return set.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1019,12 +1019,14 @@ type AuditLogOptions struct {
|
|||||||
APIName string
|
APIName string
|
||||||
Status string
|
Status string
|
||||||
VersionID string
|
VersionID string
|
||||||
|
Error string
|
||||||
}
|
}
|
||||||
|
|
||||||
// sends audit logs for internal subsystem activity
|
// sends audit logs for internal subsystem activity
|
||||||
func auditLogInternal(ctx context.Context, bucket, object string, opts AuditLogOptions) {
|
func auditLogInternal(ctx context.Context, bucket, object string, opts AuditLogOptions) {
|
||||||
entry := audit.NewEntry(globalDeploymentID)
|
entry := audit.NewEntry(globalDeploymentID)
|
||||||
entry.Trigger = opts.Trigger
|
entry.Trigger = opts.Trigger
|
||||||
|
entry.Error = opts.Error
|
||||||
entry.API.Name = opts.APIName
|
entry.API.Name = opts.APIName
|
||||||
entry.API.Bucket = bucket
|
entry.API.Bucket = bucket
|
||||||
entry.API.Object = object
|
entry.API.Object = object
|
||||||
@ -1033,6 +1035,11 @@ func auditLogInternal(ctx context.Context, bucket, object string, opts AuditLogO
|
|||||||
entry.ReqQuery[xhttp.VersionID] = opts.VersionID
|
entry.ReqQuery[xhttp.VersionID] = opts.VersionID
|
||||||
}
|
}
|
||||||
entry.API.Status = opts.Status
|
entry.API.Status = opts.Status
|
||||||
|
// Merge tag information if found - this is currently needed for tags
|
||||||
|
// set during decommissioning.
|
||||||
|
if reqInfo := logger.GetReqInfo(ctx); reqInfo != nil {
|
||||||
|
entry.Tags = reqInfo.GetTagsMap()
|
||||||
|
}
|
||||||
ctx = logger.SetAuditEntry(ctx, &entry)
|
ctx = logger.SetAuditEntry(ctx, &entry)
|
||||||
logger.AuditLog(ctx, nil, nil, nil)
|
logger.AuditLog(ctx, nil, nil, nil)
|
||||||
}
|
}
|
||||||
|
@ -61,6 +61,8 @@ type Entry struct {
|
|||||||
ReqHeader map[string]string `json:"requestHeader,omitempty"`
|
ReqHeader map[string]string `json:"requestHeader,omitempty"`
|
||||||
RespHeader map[string]string `json:"responseHeader,omitempty"`
|
RespHeader map[string]string `json:"responseHeader,omitempty"`
|
||||||
Tags map[string]interface{} `json:"tags,omitempty"`
|
Tags map[string]interface{} `json:"tags,omitempty"`
|
||||||
|
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEntry - constructs an audit entry object with some fields filled
|
// NewEntry - constructs an audit entry object with some fields filled
|
||||||
|
Loading…
x
Reference in New Issue
Block a user