From 2ff655a745db86eba6cb7d3f4459dd083f007e1e Mon Sep 17 00:00:00 2001 From: Poorna Krishnamoorthy Date: Wed, 25 Nov 2020 11:24:50 -0800 Subject: [PATCH] Refactor replication, ILM handling in DELETE API (#10945) --- cmd/admin-bucket-handlers.go | 9 +-- cmd/api-datatypes.go | 4 ++ cmd/bucket-handlers.go | 96 +++++++++++++++++----------- cmd/bucket-lifecycle.go | 26 ++++++-- cmd/bucket-object-lock.go | 5 +- cmd/bucket-replication.go | 6 +- cmd/bucket-targets.go | 24 +++++-- cmd/data-crawler.go | 2 +- cmd/erasure-object.go | 2 + cmd/object-handlers.go | 48 +++++++++++--- cmd/web-handlers.go | 36 +++++++++-- cmd/xl-storage-format-v2.go | 3 + pkg/madmin/remote-target-commands.go | 13 ++-- 13 files changed, 187 insertions(+), 87 deletions(-) diff --git a/cmd/admin-bucket-handlers.go b/cmd/admin-bucket-handlers.go index e16eb751e..b8f6fd238 100644 --- a/cmd/admin-bucket-handlers.go +++ b/cmd/admin-bucket-handlers.go @@ -20,7 +20,6 @@ import ( "encoding/json" "io" "io/ioutil" - "net" "net/http" "github.com/gorilla/mux" @@ -165,12 +164,8 @@ func (a adminAPIHandlers) SetRemoteTargetHandler(w http.ResponseWriter, r *http. writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL) return } - host, port, err := net.SplitHostPort(target.Endpoint) - if err != nil { - writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL) - return - } - sameTarget, _ := isLocalHost(host, port, globalMinioPort) + + sameTarget, _ := isLocalHost(target.URL().Hostname(), target.URL().Port(), globalMinioPort) if sameTarget && bucket == target.TargetBucket { writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBucketRemoteIdenticalToSource), r.URL) return diff --git a/cmd/api-datatypes.go b/cmd/api-datatypes.go index 45aafd309..1351af8b9 100644 --- a/cmd/api-datatypes.go +++ b/cmd/api-datatypes.go @@ -35,6 +35,8 @@ type DeletedObject struct { DeleteMarkerMTime time.Time `xml:"DeleteMarkerMTime,omitempty"` // Status of versioned delete (of object or DeleteMarker) VersionPurgeStatus VersionPurgeStatusType `xml:"VersionPurgeStatus,omitempty"` + // PurgeTransitioned is nonempty if object is in transition tier + PurgeTransitioned string `xml:"PurgeTransitioned,omitempty"` } // ObjectToDelete carries key name for the object to delete. @@ -47,6 +49,8 @@ type ObjectToDelete struct { VersionPurgeStatus VersionPurgeStatusType `xml:"VersionPurgeStatus"` // Version ID of delete marker DeleteMarkerVersionID string `xml:"DeleteMarkerVersionId"` + // PurgeTransitioned is nonempty if object is in transition tier + PurgeTransitioned string `xml:"PurgeTransitioned"` } // createBucketConfiguration container for bucket configuration request from client. diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index ee29c1f6f..e18b8f21c 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -37,6 +37,7 @@ import ( "github.com/minio/minio/cmd/crypto" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/bucket/lifecycle" objectlock "github.com/minio/minio/pkg/bucket/object/lock" "github.com/minio/minio/pkg/bucket/policy" "github.com/minio/minio/pkg/bucket/replication" @@ -404,8 +405,18 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, if api.CacheAPI() != nil { getObjectInfoFn = api.CacheAPI().GetObjectInfo } + var ( + hasLockEnabled, hasLifecycleConfig bool + goi ObjectInfo + gerr error + ) replicateDeletes := hasReplicationRules(ctx, bucket, deleteObjects.Objects) - + if rcfg, _ := globalBucketObjectLockSys.Get(bucket); rcfg.LockEnabled { + hasLockEnabled = true + } + if _, err := globalBucketMetadataSys.GetLifecycleConfig(bucket); err == nil { + hasLifecycleConfig = true + } dErrs := make([]DeleteError, len(deleteObjects.Objects)) for index, object := range deleteObjects.Objects { if apiErrCode := checkRequestAuthType(ctx, r, policy.DeleteObjectAction, bucket, object.ObjectName); apiErrCode != ErrNone { @@ -422,10 +433,33 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, } continue } - + if replicateDeletes || hasLockEnabled || hasLifecycleConfig { + goi, gerr = getObjectInfoFn(ctx, bucket, object.ObjectName, ObjectOptions{ + VersionID: object.VersionID, + }) + } + if hasLifecycleConfig && gerr == nil { + object.PurgeTransitioned = goi.TransitionStatus + } + if replicateDeletes { + delMarker, replicate := checkReplicateDelete(ctx, bucket, ObjectToDelete{ + ObjectName: object.ObjectName, + VersionID: object.VersionID, + }, goi, gerr) + if replicate { + if object.VersionID != "" { + object.VersionPurgeStatus = Pending + if delMarker { + object.DeleteMarkerVersionID = object.VersionID + } + } else { + object.DeleteMarkerReplicationStatus = string(replication.Pending) + } + } + } if object.VersionID != "" { - if rcfg, _ := globalBucketObjectLockSys.Get(bucket); rcfg.LockEnabled { - if apiErrCode := enforceRetentionBypassForDelete(ctx, r, bucket, object, getObjectInfoFn); apiErrCode != ErrNone { + if hasLockEnabled { + if apiErrCode := enforceRetentionBypassForDelete(ctx, r, bucket, object, goi, gerr); apiErrCode != ErrNone { apiErr := errorCodes.ToAPIErr(apiErrCode) dErrs[index] = DeleteError{ Code: apiErr.Code, @@ -440,22 +474,6 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, // Avoid duplicate objects, we use map to filter them out. if _, ok := objectsToDelete[object]; !ok { - if replicateDeletes { - delMarker, replicate := checkReplicateDelete(ctx, getObjectInfoFn, bucket, ObjectToDelete{ - ObjectName: object.ObjectName, - VersionID: object.VersionID, - }) - if replicate { - if object.VersionID != "" { - object.VersionPurgeStatus = Pending - if delMarker { - object.DeleteMarkerVersionID = object.VersionID - } - } else { - object.DeleteMarkerReplicationStatus = string(replication.Pending) - } - } - } objectsToDelete[object] = index } } @@ -475,24 +493,17 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, Versioned: globalBucketVersioningSys.Enabled(bucket), VersionSuspended: globalBucketVersioningSys.Suspended(bucket), }) - deletedObjects := make([]DeletedObject, len(deleteObjects.Objects)) for i := range errs { - var dindex int - if replicateDeletes { - dindex = objectsToDelete[ObjectToDelete{ - ObjectName: dObjects[i].ObjectName, - VersionID: dObjects[i].VersionID, - DeleteMarkerVersionID: dObjects[i].DeleteMarkerVersionID, - VersionPurgeStatus: dObjects[i].VersionPurgeStatus, - DeleteMarkerReplicationStatus: dObjects[i].DeleteMarkerReplicationStatus, - }] - } else { - dindex = objectsToDelete[ObjectToDelete{ - ObjectName: dObjects[i].ObjectName, - VersionID: dObjects[i].VersionID, - }] - } + + dindex := objectsToDelete[ObjectToDelete{ + ObjectName: dObjects[i].ObjectName, + VersionID: dObjects[i].VersionID, + DeleteMarkerVersionID: dObjects[i].DeleteMarkerVersionID, + VersionPurgeStatus: dObjects[i].VersionPurgeStatus, + DeleteMarkerReplicationStatus: dObjects[i].DeleteMarkerReplicationStatus, + PurgeTransitioned: dObjects[i].PurgeTransitioned, + }] if errs[i] == nil || isErrObjectNotFound(errs[i]) || isErrVersionNotFound(errs[i]) { if replicateDeletes { dObjects[i].DeleteMarkerReplicationStatus = deleteList[i].DeleteMarkerReplicationStatus @@ -532,6 +543,19 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, }) } } + + if hasLifecycleConfig && dobj.PurgeTransitioned == lifecycle.TransitionComplete { // clean up transitioned tier + action := lifecycle.DeleteAction + if dobj.VersionID != "" { + action = lifecycle.DeleteVersionAction + } + deleteTransitionedObject(ctx, newObjectLayerFn(), bucket, dobj.ObjectName, lifecycle.ObjectOpts{ + Name: dobj.ObjectName, + VersionID: dobj.VersionID, + DeleteMarker: dobj.DeleteMarker, + }, action, true) + } + } // Notify deleted event for objects. for _, dobj := range deletedObjects { diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 03d64d4ec..d67dc2d71 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -111,7 +111,9 @@ func (t *transitionState) addWorker(ctx context.Context, objectAPI ObjectLayer) if !ok { return } - transitionObject(ctx, objectAPI, oi) + if err := transitionObject(ctx, objectAPI, oi); err != nil { + logger.LogIf(ctx, err) + } } } }() @@ -244,8 +246,9 @@ func putTransitionOpts(objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) { // handle deletes of transitioned objects or object versions when one of the following is true: // 1. temporarily restored copies of objects (restored with the PostRestoreObject API) expired. // 2. life cycle expiry date is met on the object. -func deleteTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket, object string, objInfo ObjectInfo, lcOpts lifecycle.ObjectOpts, action lifecycle.Action) error { - if objInfo.TransitionStatus == "" { +// 3. Object is removed through DELETE api call +func deleteTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket, object string, lcOpts lifecycle.ObjectOpts, action lifecycle.Action, isDeleteTierOnly bool) error { + if lcOpts.TransitionStatus == "" && !isDeleteTierOnly { return nil } lc, err := globalLifecycleSys.Get(bucket) @@ -263,29 +266,38 @@ func deleteTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket var opts ObjectOptions opts.Versioned = globalBucketVersioningSys.Enabled(bucket) - opts.VersionID = objInfo.VersionID + opts.VersionID = lcOpts.VersionID switch action { case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction: // delete locally restored copy of object or object version // from the source, while leaving metadata behind. The data on // transitioned tier lies untouched and still accessible - opts.TransitionStatus = objInfo.TransitionStatus + opts.TransitionStatus = lcOpts.TransitionStatus _, err = objectAPI.DeleteObject(ctx, bucket, object, opts) return err case lifecycle.DeleteAction, lifecycle.DeleteVersionAction: // When an object is past expiry, delete the data from transitioned tier and // metadata from source - if err := tgt.RemoveObject(ctx, arn.Bucket, object, miniogo.RemoveObjectOptions{VersionID: objInfo.VersionID}); err != nil { + if err := tgt.RemoveObject(context.Background(), arn.Bucket, object, miniogo.RemoveObjectOptions{VersionID: lcOpts.VersionID}); err != nil { logger.LogIf(ctx, err) } + + if isDeleteTierOnly { + return nil + } _, err = objectAPI.DeleteObject(ctx, bucket, object, opts) if err != nil { return err } eventName := event.ObjectRemovedDelete - if objInfo.DeleteMarker { + if lcOpts.DeleteMarker { eventName = event.ObjectRemovedDeleteMarkerCreated } + objInfo := ObjectInfo{ + Name: object, + VersionID: lcOpts.VersionID, + DeleteMarker: lcOpts.DeleteMarker, + } // Notify object deleted event. sendEvent(eventArgs{ EventName: eventName, diff --git a/cmd/bucket-object-lock.go b/cmd/bucket-object-lock.go index c010e7232..d8ee45924 100644 --- a/cmd/bucket-object-lock.go +++ b/cmd/bucket-object-lock.go @@ -82,7 +82,7 @@ func enforceRetentionForDeletion(ctx context.Context, objInfo ObjectInfo) (locke // For objects in "Governance" mode, overwrite is allowed if a) object retention date is past OR // governance bypass headers are set and user has governance bypass permissions. // Objects in "Compliance" mode can be overwritten only if retention date is past. -func enforceRetentionBypassForDelete(ctx context.Context, r *http.Request, bucket string, object ObjectToDelete, getObjectInfoFn GetObjectInfoFn) APIErrorCode { +func enforceRetentionBypassForDelete(ctx context.Context, r *http.Request, bucket string, object ObjectToDelete, oi ObjectInfo, gerr error) APIErrorCode { opts, err := getOpts(ctx, r, bucket, object.ObjectName) if err != nil { return toAPIErrorCode(ctx, err) @@ -90,8 +90,7 @@ func enforceRetentionBypassForDelete(ctx context.Context, r *http.Request, bucke opts.VersionID = object.VersionID - oi, err := getObjectInfoFn(ctx, bucket, object.ObjectName, opts) - if err != nil { + if gerr != nil { // error from GetObjectInfo switch err.(type) { case MethodNotAllowed: // This happens usually for a delete marker if oi.DeleteMarker { diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 1bd0014f5..94f2c79ca 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -144,15 +144,14 @@ func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToD } // returns whether object version is a deletemarker and if object qualifies for replication -func checkReplicateDelete(ctx context.Context, getObjectInfoFn GetObjectInfoFn, bucket string, dobj ObjectToDelete) (dm, replicate bool) { +func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (dm, replicate bool) { rcfg, err := getReplicationConfig(ctx, bucket) if err != nil || rcfg == nil { return false, false } - oi, err := getObjectInfoFn(ctx, bucket, dobj.ObjectName, ObjectOptions{VersionID: dobj.VersionID}) // when incoming delete is removal of a delete marker( a.k.a versioned delete), // GetObjectInfo returns extra information even though it returns errFileNotFound - if err != nil { + if gerr != nil { validReplStatus := false switch oi.ReplicationStatus { case replication.Pending, replication.Complete, replication.Failed: @@ -469,6 +468,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa dstOpts := miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{SourceVersionID: objInfo.VersionID}} _, err = tgt.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts) } + r.Close() if err != nil { replicationStatus = replication.Failed diff --git a/cmd/bucket-targets.go b/cmd/bucket-targets.go index 9adc88d8a..c690db897 100644 --- a/cmd/bucket-targets.go +++ b/cmd/bucket-targets.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "encoding/hex" "net/http" "strings" "sync" @@ -28,6 +29,7 @@ import ( "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio/pkg/bucket/versioning" "github.com/minio/minio/pkg/madmin" + sha256 "github.com/minio/sha256-simd" ) // BucketTargetSys represents bucket targets subsystem @@ -349,7 +351,7 @@ func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*m getRemoteTargetInstanceTransport = newGatewayHTTPTransport(1 * time.Hour) }) - core, err := miniogo.NewCore(tcfg.Endpoint, &miniogo.Options{ + core, err := miniogo.NewCore(tcfg.URL().Host, &miniogo.Options{ Creds: creds, Secure: tcfg.Secure, Transport: getRemoteTargetInstanceTransport, @@ -364,18 +366,28 @@ func (sys *BucketTargetSys) getRemoteARN(bucket string, target *madmin.BucketTar } tgts := sys.targetsMap[bucket] for _, tgt := range tgts { - if tgt.Type == target.Type && tgt.TargetBucket == target.TargetBucket && target.URL() == tgt.URL() { + if tgt.Type == target.Type && tgt.TargetBucket == target.TargetBucket && target.URL().String() == tgt.URL().String() { return tgt.Arn } } if !madmin.ServiceType(target.Type).IsValid() { return "" } + return generateARN(target) +} + +// generate ARN that is unique to this target type +func generateARN(t *madmin.BucketTarget) string { + hash := sha256.New() + hash.Write([]byte(t.Type)) + hash.Write([]byte(t.Region)) + hash.Write([]byte(t.TargetBucket)) + hashSum := hex.EncodeToString(hash.Sum(nil)) arn := madmin.ARN{ - Type: target.Type, - ID: mustGetUUID(), - Region: target.Region, - Bucket: target.TargetBucket, + Type: t.Type, + ID: hashSum, + Region: t.Region, + Bucket: t.TargetBucket, } return arn.String() } diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index 30f2ef2fe..fc504f2a9 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -772,7 +772,7 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action return 0 } if obj.TransitionStatus != "" { - if err := deleteTransitionedObject(ctx, o, i.bucket, i.objectPath(), obj, lcOpts, action); err != nil { + if err := deleteTransitionedObject(ctx, o, i.bucket, i.objectPath(), lcOpts, action, false); err != nil { logger.LogIf(ctx, err) return size } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index efdfa22db..47fccbb1b 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -905,6 +905,7 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec DeleteMarkerReplicationStatus: versions[objIndex].DeleteMarkerReplicationStatus, ObjectName: versions[objIndex].Name, VersionPurgeStatus: versions[objIndex].VersionPurgeStatus, + PurgeTransitioned: objects[objIndex].PurgeTransitioned, } } else { dobjects[objIndex] = DeletedObject{ @@ -912,6 +913,7 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec VersionID: versions[objIndex].VersionID, VersionPurgeStatus: versions[objIndex].VersionPurgeStatus, DeleteMarkerReplicationStatus: versions[objIndex].DeleteMarkerReplicationStatus, + PurgeTransitioned: objects[objIndex].PurgeTransitioned, } } } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 979099f39..597d5b8bf 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -2714,7 +2714,24 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } - _, replicateDel := checkReplicateDelete(ctx, getObjectInfo, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}) + var ( + hasLockEnabled, hasLifecycleConfig bool + goi ObjectInfo + gerr error + ) + replicateDeletes := hasReplicationRules(ctx, bucket, []ObjectToDelete{{ObjectName: object, VersionID: opts.VersionID}}) + if rcfg, _ := globalBucketObjectLockSys.Get(bucket); rcfg.LockEnabled { + hasLockEnabled = true + } + if _, err := globalBucketMetadataSys.GetLifecycleConfig(bucket); err == nil { + hasLifecycleConfig = true + } + if replicateDeletes || hasLockEnabled || hasLifecycleConfig { + goi, gerr = getObjectInfo(ctx, bucket, object, ObjectOptions{ + VersionID: opts.VersionID, + }) + } + _, replicateDel := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr) if replicateDel { if opts.VersionID != "" { opts.VersionPurgeStatus = Pending @@ -2722,7 +2739,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. opts.DeleteMarkerReplicationStatus = string(replication.Pending) } } - replicaDel := false + vID := opts.VersionID if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() { // check if replica has permission to be deleted. if apiErrCode := checkRequestAuthType(ctx, r, policy.ReplicateDeleteAction, bucket, object); apiErrCode != ErrNone { @@ -2730,20 +2747,19 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. return } opts.DeleteMarkerReplicationStatus = replication.Replica.String() - replicaDel = true - } - vID := opts.VersionID - if replicaDel && opts.VersionPurgeStatus.Empty() { - // opts.VersionID holds delete marker version ID to replicate and not yet present on disk - vID = "" + if opts.VersionPurgeStatus.Empty() { + // opts.VersionID holds delete marker version ID to replicate and not yet present on disk + vID = "" + } } + apiErr := ErrNone if rcfg, _ := globalBucketObjectLockSys.Get(bucket); rcfg.LockEnabled { if vID != "" { apiErr = enforceRetentionBypassForDelete(ctx, r, bucket, ObjectToDelete{ ObjectName: object, VersionID: vID, - }, getObjectInfo) + }, goi, gerr) if apiErr != ErrNone && apiErr != ErrNoSuchKey { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(apiErr), r.URL, guessIsBrowserReq(r)) return @@ -2789,6 +2805,20 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. Bucket: bucket, }) } + if goi.TransitionStatus == lifecycle.TransitionComplete { // clean up transitioned tier + action := lifecycle.DeleteAction + if goi.VersionID != "" { + action = lifecycle.DeleteVersionAction + } + deleteTransitionedObject(ctx, newObjectLayerFn(), bucket, object, lifecycle.ObjectOpts{ + Name: object, + UserTags: goi.UserTags, + VersionID: goi.VersionID, + DeleteMarker: goi.DeleteMarker, + TransitionStatus: goi.TransitionStatus, + IsLatest: goi.IsLatest, + }, action, true) + } setPutObjHeaders(w, objInfo, true) writeSuccessNoContent(w) } diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 28fa37cbd..d00768d3b 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -47,6 +47,7 @@ import ( xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/auth" + "github.com/minio/minio/pkg/bucket/lifecycle" objectlock "github.com/minio/minio/pkg/bucket/object/lock" "github.com/minio/minio/pkg/bucket/policy" "github.com/minio/minio/pkg/bucket/replication" @@ -737,14 +738,24 @@ next: return toJSONError(ctx, errAccessDenied) } } - _, replicateDel := checkReplicateDelete(ctx, getObjectInfoFn, args.BucketName, ObjectToDelete{ObjectName: objectName}) - if replicateDel { - opts.DeleteMarkerReplicationStatus = string(replication.Pending) - opts.DeleteMarker = true + var ( + replicateDel, hasLifecycleConfig bool + goi ObjectInfo + gerr error + ) + if _, err := globalBucketMetadataSys.GetLifecycleConfig(args.BucketName); err == nil { + hasLifecycleConfig = true + } + if hasReplicationRules(ctx, args.BucketName, []ObjectToDelete{{ObjectName: objectName}}) || hasLifecycleConfig { + goi, gerr = getObjectInfoFn(ctx, args.BucketName, objectName, opts) + if _, replicateDel = checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: objectName}, goi, gerr); replicateDel { + opts.DeleteMarkerReplicationStatus = string(replication.Pending) + opts.DeleteMarker = true + } } oi, err := deleteObject(ctx, objectAPI, web.CacheAPI(), args.BucketName, objectName, nil, r, opts) - if replicateDel { + if replicateDel && err == nil { globalReplicationState.queueReplicaDeleteTask(DeletedObjectVersionInfo{ DeletedObject: DeletedObject{ ObjectName: objectName, @@ -757,6 +768,19 @@ next: Bucket: args.BucketName, }) } + if goi.TransitionStatus == lifecycle.TransitionComplete && err == nil && goi.VersionID == "" { + action := lifecycle.DeleteAction + if goi.VersionID != "" { + action = lifecycle.DeleteVersionAction + } + deleteTransitionedObject(ctx, newObjectLayerFn(), args.BucketName, objectName, lifecycle.ObjectOpts{ + Name: objectName, + UserTags: goi.UserTags, + VersionID: goi.VersionID, + DeleteMarker: goi.DeleteMarker, + IsLatest: goi.IsLatest, + }, action, true) + } logger.LogIf(ctx, err) continue @@ -829,7 +853,7 @@ next: } } // since versioned delete is not available on web browser, yet - this is a simple DeleteMarker replication - _, replicateDel := checkReplicateDelete(ctx, getObjectInfoFn, args.BucketName, ObjectToDelete{ObjectName: obj.Name}) + _, replicateDel := checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: obj.Name}, obj, nil) objToDel := ObjectToDelete{ObjectName: obj.Name} if replicateDel { objToDel.DeleteMarkerReplicationStatus = string(replication.Pending) diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index f3d6a5ca3..813ca3d0a 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -508,6 +508,9 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) { case DeleteType: if bytes.Equal(version.DeleteMarker.VersionID[:], uv[:]) { if updateVersion { + if len(z.Versions[i].DeleteMarker.MetaSys) == 0 { + z.Versions[i].DeleteMarker.MetaSys = make(map[string][]byte) + } delete(z.Versions[i].DeleteMarker.MetaSys, xhttp.AmzBucketReplicationStatus) delete(z.Versions[i].DeleteMarker.MetaSys, VersionPurgeStatusKey) if fi.DeleteMarkerReplicationStatus != "" { diff --git a/pkg/madmin/remote-target-commands.go b/pkg/madmin/remote-target-commands.go index c881cbf65..53daf02d3 100644 --- a/pkg/madmin/remote-target-commands.go +++ b/pkg/madmin/remote-target-commands.go @@ -118,20 +118,15 @@ func (t *BucketTarget) Clone() BucketTarget { } // URL returns target url -func (t BucketTarget) URL() string { +func (t BucketTarget) URL() *url.URL { scheme := "http" if t.Secure { scheme = "https" } - urlStr := fmt.Sprintf("%s://%s", scheme, t.Endpoint) - u, err := url.Parse(urlStr) - if err != nil { - return urlStr + return &url.URL{ + Scheme: scheme, + Host: t.Endpoint, } - if u.Port() == "80" || u.Port() == "443" { - u.Host = u.Hostname() - } - return u.String() } // Empty returns true if struct is empty.