mirror of https://github.com/minio/minio.git
tagging: Add event notif for PUT object tagging (#11366)
An optimization to avoid double calling for during PutObject tagging
This commit is contained in:
parent
6ef678663e
commit
e96fdcd5ec
|
@ -1146,7 +1146,14 @@ func (er erasureObjects) addPartial(bucket, object, versionID string) {
|
|||
}
|
||||
|
||||
// PutObjectTags - replace or add tags to an existing object
|
||||
func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) error {
|
||||
func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
// Lock the object before updating tags.
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
|
||||
disks := er.getDisks()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
|
@ -1154,7 +1161,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
|
|||
|
||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// List all online disks.
|
||||
|
@ -1163,13 +1170,13 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
|
|||
// Pick latest valid metadata.
|
||||
fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
if fi.Deleted {
|
||||
if opts.VersionID == "" {
|
||||
return toObjectErr(errFileNotFound, bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(errFileNotFound, bucket, object)
|
||||
}
|
||||
return toObjectErr(errMethodNotAllowed, bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object)
|
||||
}
|
||||
|
||||
onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi.Erasure.Distribution)
|
||||
|
@ -1192,15 +1199,18 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
|
|||
|
||||
// Write unique `xl.meta` for each disk.
|
||||
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Atomically rename metadata from tmp location to destination for each disk.
|
||||
if _, err = renameFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
return nil
|
||||
objInfo := fi.ToObjectInfo(bucket, object)
|
||||
objInfo.UserTags = tags
|
||||
|
||||
return objInfo, nil
|
||||
}
|
||||
|
||||
// updateObjectMeta will update the metadata of a file.
|
||||
|
@ -1264,7 +1274,7 @@ func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object st
|
|||
}
|
||||
|
||||
// DeleteObjectTags - delete object tags from an existing object
|
||||
func (er erasureObjects) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) error {
|
||||
func (er erasureObjects) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
return er.PutObjectTags(ctx, bucket, object, "", opts)
|
||||
}
|
||||
|
||||
|
|
|
@ -1542,59 +1542,59 @@ func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) Hea
|
|||
}
|
||||
|
||||
// PutObjectTags - replace or add tags to an existing object
|
||||
func (z *erasureServerPools) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) error {
|
||||
func (z *erasureServerPools) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
object = encodeDirObject(object)
|
||||
if z.SinglePool() {
|
||||
return z.serverPools[0].PutObjectTags(ctx, bucket, object, tags, opts)
|
||||
}
|
||||
|
||||
for _, pool := range z.serverPools {
|
||||
err := pool.PutObjectTags(ctx, bucket, object, tags, opts)
|
||||
objInfo, err := pool.PutObjectTags(ctx, bucket, object, tags, opts)
|
||||
if err != nil {
|
||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
return nil
|
||||
return objInfo, nil
|
||||
}
|
||||
if opts.VersionID != "" {
|
||||
return VersionNotFound{
|
||||
return ObjectInfo{}, VersionNotFound{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
VersionID: opts.VersionID,
|
||||
}
|
||||
}
|
||||
return ObjectNotFound{
|
||||
return ObjectInfo{}, ObjectNotFound{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteObjectTags - delete object tags from an existing object
|
||||
func (z *erasureServerPools) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) error {
|
||||
func (z *erasureServerPools) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
object = encodeDirObject(object)
|
||||
if z.SinglePool() {
|
||||
return z.serverPools[0].DeleteObjectTags(ctx, bucket, object, opts)
|
||||
}
|
||||
for _, pool := range z.serverPools {
|
||||
err := pool.DeleteObjectTags(ctx, bucket, object, opts)
|
||||
objInfo, err := pool.DeleteObjectTags(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
return nil
|
||||
return objInfo, nil
|
||||
}
|
||||
if opts.VersionID != "" {
|
||||
return VersionNotFound{
|
||||
return ObjectInfo{}, VersionNotFound{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
VersionID: opts.VersionID,
|
||||
}
|
||||
}
|
||||
return ObjectNotFound{
|
||||
return ObjectInfo{}, ObjectNotFound{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
}
|
||||
|
|
|
@ -1384,13 +1384,13 @@ func (s *erasureSets) HealObject(ctx context.Context, bucket, object, versionID
|
|||
}
|
||||
|
||||
// PutObjectTags - replace or add tags to an existing object
|
||||
func (s *erasureSets) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) error {
|
||||
func (s *erasureSets) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
er := s.getHashedSet(object)
|
||||
return er.PutObjectTags(ctx, bucket, object, tags, opts)
|
||||
}
|
||||
|
||||
// DeleteObjectTags - delete object tags from an existing object
|
||||
func (s *erasureSets) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) error {
|
||||
func (s *erasureSets) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
er := s.getHashedSet(object)
|
||||
return er.DeleteObjectTags(ctx, bucket, object, opts)
|
||||
}
|
||||
|
|
19
cmd/fs-v1.go
19
cmd/fs-v1.go
|
@ -1475,9 +1475,9 @@ func (fs *FSObjects) GetObjectTags(ctx context.Context, bucket, object string, o
|
|||
}
|
||||
|
||||
// PutObjectTags - replace or add tags to an existing object
|
||||
func (fs *FSObjects) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) error {
|
||||
func (fs *FSObjects) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
if opts.VersionID != "" && opts.VersionID != nullVersionID {
|
||||
return VersionNotFound{
|
||||
return ObjectInfo{}, VersionNotFound{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
VersionID: opts.VersionID,
|
||||
|
@ -1491,7 +1491,7 @@ func (fs *FSObjects) PutObjectTags(ctx context.Context, bucket, object string, t
|
|||
wlk, err = fs.rwPool.Create(fsMetaPath)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return toObjectErr(err, bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
}
|
||||
// This close will allow for locks to be synchronized on `fs.json`.
|
||||
|
@ -1512,13 +1512,20 @@ func (fs *FSObjects) PutObjectTags(ctx context.Context, bucket, object string, t
|
|||
}
|
||||
|
||||
if _, err = fsMeta.WriteTo(wlk); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
return nil
|
||||
|
||||
// Stat the file to get file size.
|
||||
fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object))
|
||||
if err != nil {
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
|
||||
return fsMeta.ToObjectInfo(bucket, object, fi), nil
|
||||
}
|
||||
|
||||
// DeleteObjectTags - delete object tags from an existing object
|
||||
func (fs *FSObjects) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) error {
|
||||
func (fs *FSObjects) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
return fs.PutObjectTags(ctx, bucket, object, "", opts)
|
||||
}
|
||||
|
||||
|
|
|
@ -208,9 +208,9 @@ func (a GatewayUnsupported) GetMetrics(ctx context.Context) (*BackendMetrics, er
|
|||
}
|
||||
|
||||
// PutObjectTags - not implemented.
|
||||
func (a GatewayUnsupported) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) error {
|
||||
func (a GatewayUnsupported) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
return NotImplemented{}
|
||||
return ObjectInfo{}, NotImplemented{}
|
||||
}
|
||||
|
||||
// GetObjectTags - not implemented.
|
||||
|
@ -220,9 +220,9 @@ func (a GatewayUnsupported) GetObjectTags(ctx context.Context, bucket, object st
|
|||
}
|
||||
|
||||
// DeleteObjectTags - not implemented.
|
||||
func (a GatewayUnsupported) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) error {
|
||||
func (a GatewayUnsupported) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
return NotImplemented{}
|
||||
return ObjectInfo{}, NotImplemented{}
|
||||
}
|
||||
|
||||
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
|
||||
|
|
|
@ -728,23 +728,34 @@ func (l *s3Objects) GetObjectTags(ctx context.Context, bucket string, object str
|
|||
}
|
||||
|
||||
// PutObjectTags attaches the tags to the object
|
||||
func (l *s3Objects) PutObjectTags(ctx context.Context, bucket, object string, tagStr string, opts minio.ObjectOptions) error {
|
||||
func (l *s3Objects) PutObjectTags(ctx context.Context, bucket, object string, tagStr string, opts minio.ObjectOptions) (minio.ObjectInfo, error) {
|
||||
tagObj, err := tags.Parse(tagStr, true)
|
||||
if err != nil {
|
||||
return minio.ErrorRespToObjectError(err, bucket, object)
|
||||
return minio.ObjectInfo{}, minio.ErrorRespToObjectError(err, bucket, object)
|
||||
}
|
||||
if err = l.Client.PutObjectTagging(ctx, bucket, object, tagObj, miniogo.PutObjectTaggingOptions{}); err != nil {
|
||||
return minio.ErrorRespToObjectError(err, bucket, object)
|
||||
if err = l.Client.PutObjectTagging(ctx, bucket, object, tagObj, miniogo.PutObjectTaggingOptions{VersionID: opts.VersionID}); err != nil {
|
||||
return minio.ObjectInfo{}, minio.ErrorRespToObjectError(err, bucket, object)
|
||||
}
|
||||
return nil
|
||||
|
||||
objInfo, err := l.GetObjectInfo(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
return minio.ObjectInfo{}, minio.ErrorRespToObjectError(err, bucket, object)
|
||||
}
|
||||
|
||||
return objInfo, nil
|
||||
}
|
||||
|
||||
// DeleteObjectTags removes the tags attached to the object
|
||||
func (l *s3Objects) DeleteObjectTags(ctx context.Context, bucket, object string, opts minio.ObjectOptions) error {
|
||||
func (l *s3Objects) DeleteObjectTags(ctx context.Context, bucket, object string, opts minio.ObjectOptions) (minio.ObjectInfo, error) {
|
||||
if err := l.Client.RemoveObjectTagging(ctx, bucket, object, miniogo.RemoveObjectTaggingOptions{}); err != nil {
|
||||
return minio.ErrorRespToObjectError(err, bucket, object)
|
||||
return minio.ObjectInfo{}, minio.ErrorRespToObjectError(err, bucket, object)
|
||||
}
|
||||
return nil
|
||||
objInfo, err := l.GetObjectInfo(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
return minio.ObjectInfo{}, minio.ErrorRespToObjectError(err, bucket, object)
|
||||
}
|
||||
|
||||
return objInfo, nil
|
||||
}
|
||||
|
||||
// IsCompressionSupported returns whether compression is applicable for this layer.
|
||||
|
|
|
@ -157,7 +157,7 @@ type ObjectLayer interface {
|
|||
Health(ctx context.Context, opts HealthOptions) HealthResult
|
||||
|
||||
// ObjectTagging operations
|
||||
PutObjectTags(context.Context, string, string, string, ObjectOptions) error
|
||||
PutObjectTags(context.Context, string, string, string, ObjectOptions) (ObjectInfo, error)
|
||||
GetObjectTags(context.Context, string, string, ObjectOptions) (*tags.Tags, error)
|
||||
DeleteObjectTags(context.Context, string, string, ObjectOptions) error
|
||||
DeleteObjectTags(context.Context, string, string, ObjectOptions) (ObjectInfo, error)
|
||||
}
|
||||
|
|
|
@ -3281,24 +3281,35 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h
|
|||
opts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
|
||||
}
|
||||
|
||||
tagsStr := tags.String()
|
||||
|
||||
// Put object tags
|
||||
err = objAPI.PutObjectTags(ctx, bucket, object, tags.String(), opts)
|
||||
objInfo, err := objAPI.PutObjectTags(ctx, bucket, object, tagsStr, opts)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
if replicate {
|
||||
if objInfo, err := objAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil {
|
||||
scheduleReplication(ctx, objInfo, objAPI, sync)
|
||||
}
|
||||
}
|
||||
|
||||
if opts.VersionID != "" {
|
||||
w.Header()[xhttp.AmzVersionID] = []string{opts.VersionID}
|
||||
if objInfo.VersionID != "" {
|
||||
w.Header()[xhttp.AmzVersionID] = []string{objInfo.VersionID}
|
||||
}
|
||||
|
||||
writeSuccessResponseHeadersOnly(w)
|
||||
|
||||
sendEvent(eventArgs{
|
||||
EventName: event.ObjectCreatedPutTagging,
|
||||
BucketName: bucket,
|
||||
Object: objInfo,
|
||||
ReqParams: extractReqParams(r),
|
||||
RespElements: extractRespElements(w),
|
||||
UserAgent: r.UserAgent(),
|
||||
Host: handlers.GetSourceIP(r),
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// DeleteObjectTaggingHandler - DELETE object tagging
|
||||
|
@ -3345,21 +3356,31 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r
|
|||
opts.UserDefined = make(map[string]string)
|
||||
opts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
|
||||
}
|
||||
// Delete object tags
|
||||
if err = objAPI.DeleteObjectTags(ctx, bucket, object, opts); err != nil {
|
||||
|
||||
if _, err = objAPI.DeleteObjectTags(ctx, bucket, object, opts); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
if opts.VersionID != "" {
|
||||
w.Header()[xhttp.AmzVersionID] = []string{opts.VersionID}
|
||||
}
|
||||
oi.UserTags = ""
|
||||
|
||||
if replicate {
|
||||
scheduleReplication(ctx, oi, objAPI, sync)
|
||||
}
|
||||
|
||||
if oi.VersionID != "" {
|
||||
w.Header()[xhttp.AmzVersionID] = []string{oi.VersionID}
|
||||
}
|
||||
writeSuccessNoContent(w)
|
||||
|
||||
sendEvent(eventArgs{
|
||||
EventName: event.ObjectCreatedDeleteTagging,
|
||||
BucketName: bucket,
|
||||
Object: oi,
|
||||
ReqParams: extractReqParams(r),
|
||||
RespElements: extractRespElements(w),
|
||||
UserAgent: r.UserAgent(),
|
||||
Host: handlers.GetSourceIP(r),
|
||||
})
|
||||
}
|
||||
|
||||
// RestoreObjectHandler - POST restore object handler.
|
||||
|
|
|
@ -41,6 +41,8 @@ const (
|
|||
ObjectCreatedPut
|
||||
ObjectCreatedPutRetention
|
||||
ObjectCreatedPutLegalHold
|
||||
ObjectCreatedPutTagging
|
||||
ObjectCreatedDeleteTagging
|
||||
ObjectRemovedAll
|
||||
ObjectRemovedDelete
|
||||
ObjectRemovedDeleteMarkerCreated
|
||||
|
@ -77,6 +79,7 @@ func (name Name) Expand() []Name {
|
|||
ObjectCreatedCompleteMultipartUpload, ObjectCreatedCopy,
|
||||
ObjectCreatedPost, ObjectCreatedPut,
|
||||
ObjectCreatedPutRetention, ObjectCreatedPutLegalHold,
|
||||
ObjectCreatedPutTagging, ObjectCreatedDeleteTagging,
|
||||
ObjectReplicationComplete, ObjectReplicationFailed,
|
||||
}
|
||||
case ObjectRemovedAll:
|
||||
|
@ -134,6 +137,10 @@ func (name Name) String() string {
|
|||
return "s3:ObjectCreated:Post"
|
||||
case ObjectCreatedPut:
|
||||
return "s3:ObjectCreated:Put"
|
||||
case ObjectCreatedPutTagging:
|
||||
return "s3:ObjectCreated:PutTagging"
|
||||
case ObjectCreatedDeleteTagging:
|
||||
return "s3:ObjectCreated:DeleteTagging"
|
||||
case ObjectCreatedPutRetention:
|
||||
return "s3:ObjectCreated:PutRetention"
|
||||
case ObjectCreatedPutLegalHold:
|
||||
|
@ -244,6 +251,10 @@ func ParseName(s string) (Name, error) {
|
|||
return ObjectCreatedPutRetention, nil
|
||||
case "s3:ObjectCreated:PutLegalHold":
|
||||
return ObjectCreatedPutLegalHold, nil
|
||||
case "s3:ObjectCreated:PutTagging":
|
||||
return ObjectCreatedPutTagging, nil
|
||||
case "s3:ObjectCreated:DeleteTagging":
|
||||
return ObjectCreatedDeleteTagging, nil
|
||||
case "s3:ObjectRemoved:*":
|
||||
return ObjectRemovedAll, nil
|
||||
case "s3:ObjectRemoved:Delete":
|
||||
|
|
|
@ -31,8 +31,9 @@ func TestNameExpand(t *testing.T) {
|
|||
{BucketCreated, []Name{BucketCreated}},
|
||||
{BucketRemoved, []Name{BucketRemoved}},
|
||||
{ObjectAccessedAll, []Name{ObjectAccessedGet, ObjectAccessedHead, ObjectAccessedGetRetention, ObjectAccessedGetLegalHold}},
|
||||
{ObjectCreatedAll, []Name{ObjectCreatedCompleteMultipartUpload, ObjectCreatedCopy,
|
||||
ObjectCreatedPost, ObjectCreatedPut, ObjectCreatedPutRetention, ObjectCreatedPutLegalHold, ObjectReplicationComplete, ObjectReplicationFailed}},
|
||||
{ObjectCreatedAll, []Name{ObjectCreatedCompleteMultipartUpload, ObjectCreatedCopy, ObjectCreatedPost, ObjectCreatedPut,
|
||||
ObjectCreatedPutRetention, ObjectCreatedPutLegalHold, ObjectCreatedPutTagging, ObjectCreatedDeleteTagging,
|
||||
ObjectReplicationComplete, ObjectReplicationFailed}},
|
||||
{ObjectRemovedAll, []Name{ObjectRemovedDelete, ObjectRemovedDeleteMarkerCreated}},
|
||||
{ObjectAccessedHead, []Name{ObjectAccessedHead}},
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue