From ef4d023c855e145e024f380be37459b6bdb45e8a Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 17 Aug 2021 07:50:00 -0700 Subject: [PATCH] fix: various performance improvements to tiering (#12965) - deletes should always Sweep() for tiering at the end and does not need an extra getObjectInfo() call - puts, copy and multipart writes should conditionally do getObjectInfo() when tiering targets are configured - introduce 'TransitionedObject' struct for ease of usage and understanding. - multiple-pools optimization deletes don't need to hold read locks verifying objects across namespace and pools. --- cmd/api-headers.go | 2 +- cmd/bucket-handlers.go | 58 +++++++++++-------- cmd/bucket-lifecycle.go | 14 ++--- cmd/data-scanner.go | 10 ++-- cmd/erasure-metadata.go | 12 ++-- cmd/erasure-server-pool.go | 109 ++++++++++++++++++++++++------------ cmd/object-api-datatypes.go | 23 ++++---- cmd/object-api-options.go | 5 ++ cmd/object-handlers.go | 71 ++++++++++++++--------- cmd/tier-sweeper.go | 68 +++++++--------------- cmd/tier.go | 5 ++ 11 files changed, 213 insertions(+), 164 deletions(-) diff --git a/cmd/api-headers.go b/cmd/api-headers.go index a07de1e56..210c29f7e 100644 --- a/cmd/api-headers.go +++ b/cmd/api-headers.go @@ -187,7 +187,7 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSp if objInfo.IsRemote() { // Check if object is being restored. For more information on x-amz-restore header see // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax - w.Header()[xhttp.AmzStorageClass] = []string{objInfo.TransitionTier} + w.Header()[xhttp.AmzStorageClass] = []string{objInfo.TransitionedObject.Tier} } if lc, err := globalLifecycleSys.Get(objInfo.Bucket); err == nil { diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index a5c961ab2..7d16ce4b9 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -450,6 +450,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, if api.CacheAPI() != nil { getObjectInfoFn = api.CacheAPI().GetObjectInfo } + var ( hasLockEnabled, replicateSync bool goi ObjectInfo @@ -460,6 +461,9 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, hasLockEnabled = true } + versioned := globalBucketVersioningSys.Enabled(bucket) + suspended := globalBucketVersioningSys.Suspended(bucket) + dErrs := make([]DeleteError, len(deleteObjects.Objects)) oss := make([]*objSweeper, len(deleteObjects.Objects)) for index, object := range deleteObjects.Objects { @@ -491,16 +495,24 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, } } - oss[index] = newObjSweeper(bucket, object.ObjectName).WithVersion(multiDelete(object)) - // Mutations of objects on versioning suspended buckets - // affect its null version. Through opts below we select - // the null version's remote object to delete if - // transitioned. - opts := oss[index].GetOpts() - goi, gerr = getObjectInfoFn(ctx, bucket, object.ObjectName, opts) - if gerr == nil { - oss[index].SetTransitionState(goi) + opts := ObjectOptions{ + VersionID: object.VersionID, + Versioned: versioned, + VersionSuspended: suspended, } + + if replicateDeletes || object.VersionID != "" && hasLockEnabled || !globalTierConfigMgr.Empty() { + if !globalTierConfigMgr.Empty() && object.VersionID == "" && opts.VersionSuspended { + opts.VersionID = nullVersionID + } + goi, gerr = getObjectInfoFn(ctx, bucket, object.ObjectName, opts) + } + + if !globalTierConfigMgr.Empty() { + oss[index] = newObjSweeper(bucket, object.ObjectName).WithVersion(opts.VersionID).WithVersioning(versioned, suspended) + oss[index].SetTransitionState(goi.TransitionedObject) + } + if replicateDeletes { replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ ObjectName: object.ObjectName, @@ -522,18 +534,16 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, } } } - if object.VersionID != "" { - if hasLockEnabled { - if apiErrCode := enforceRetentionBypassForDelete(ctx, r, bucket, object, goi, gerr); apiErrCode != ErrNone { - apiErr := errorCodes.ToAPIErr(apiErrCode) - dErrs[index] = DeleteError{ - Code: apiErr.Code, - Message: apiErr.Description, - Key: object.ObjectName, - VersionID: object.VersionID, - } - continue + if object.VersionID != "" && hasLockEnabled { + if apiErrCode := enforceRetentionBypassForDelete(ctx, r, bucket, object, goi, gerr); apiErrCode != ErrNone { + apiErr := errorCodes.ToAPIErr(apiErrCode) + dErrs[index] = DeleteError{ + Code: apiErr.Code, + Message: apiErr.Description, + Key: object.ObjectName, + VersionID: object.VersionID, } + continue } } @@ -555,8 +565,8 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, deleteList := toNames(objectsToDelete) dObjects, errs := deleteObjectsFn(ctx, bucket, deleteList, ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(bucket), - VersionSuspended: globalBucketVersioningSys.Suspended(bucket), + Versioned: versioned, + VersionSuspended: suspended, }) deletedObjects := make([]DeletedObject, len(deleteObjects.Objects)) for i := range errs { @@ -620,6 +630,10 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, // Notify deleted event for objects. for _, dobj := range deletedObjects { + if dobj.ObjectName == "" { + continue + } + eventName := event.ObjectRemovedDelete objInfo := ObjectInfo{ Name: dobj.ObjectName, diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 341de22c2..34e4f8a92 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -232,9 +232,9 @@ func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, oi *Ob // When an object is past expiry or when a transitioned object is being // deleted, 'mark' the data in the remote tier for delete. entry := jentry{ - ObjName: oi.transitionedObjName, - VersionID: oi.transitionVersionID, - TierName: oi.TransitionTier, + ObjName: oi.TransitionedObject.Name, + VersionID: oi.TransitionedObject.VersionID, + TierName: oi.TransitionedObject.Tier, } if err := globalTierJournal.AddEntry(entry); err != nil { logger.LogIf(ctx, err) @@ -316,7 +316,7 @@ func transitionObject(ctx context.Context, objectAPI ObjectLayer, oi ObjectInfo) // getTransitionedObjectReader returns a reader from the transitioned tier. func getTransitionedObjectReader(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, oi ObjectInfo, opts ObjectOptions) (gr *GetObjectReader, err error) { - tgtClient, err := globalTierConfigMgr.getDriver(oi.TransitionTier) + tgtClient, err := globalTierConfigMgr.getDriver(oi.TransitionedObject.Tier) if err != nil { return nil, fmt.Errorf("transition storage class not configured") } @@ -333,7 +333,7 @@ func getTransitionedObjectReader(ctx context.Context, bucket, object string, rs gopts.length = length } - reader, err := tgtClient.Get(ctx, oi.transitionedObjName, remoteVersionID(oi.transitionVersionID), gopts) + reader, err := tgtClient.Get(ctx, oi.TransitionedObject.Name, remoteVersionID(oi.TransitionedObject.VersionID), gopts) if err != nil { return nil, err } @@ -557,7 +557,7 @@ func (fi FileInfo) IsRemote() bool { // IsRemote returns true if this object version's contents are in its remote // tier. func (oi ObjectInfo) IsRemote() bool { - if oi.TransitionStatus != lifecycle.TransitionComplete { + if oi.TransitionedObject.Status != lifecycle.TransitionComplete { return false } return !isRestoredObjectOnDisk(oi.UserDefined) @@ -685,7 +685,7 @@ func (oi ObjectInfo) ToLifecycleOpts() lifecycle.ObjectOpts { SuccessorModTime: oi.SuccessorModTime, RestoreOngoing: oi.RestoreOngoing, RestoreExpires: oi.RestoreExpires, - TransitionStatus: oi.TransitionStatus, + TransitionStatus: oi.TransitionedObject.Status, RemoteTiersImmediately: globalDebugRemoteTiersImmediately, } } diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index c959deea0..a651a54f4 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -913,7 +913,7 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, meta ac SuccessorModTime: meta.oi.SuccessorModTime, RestoreOngoing: meta.oi.RestoreOngoing, RestoreExpires: meta.oi.RestoreExpires, - TransitionStatus: meta.oi.TransitionStatus, + TransitionStatus: meta.oi.TransitionedObject.Status, RemoteTiersImmediately: globalDebugRemoteTiersImmediately, }) if i.debug { @@ -976,7 +976,7 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, meta ac // applyTierObjSweep removes remote object pending deletion and the free-version // tracking this information. func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, meta actionMeta) { - if !meta.oi.tierFreeVersion { + if !meta.oi.TransitionedObject.FreeVersion { // nothing to be done return } @@ -989,7 +989,7 @@ func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, meta return err } // Remove the remote object - err := deleteObjectFromRemoteTier(ctx, meta.oi.transitionedObjName, meta.oi.transitionVersionID, meta.oi.TransitionTier) + err := deleteObjectFromRemoteTier(ctx, meta.oi.TransitionedObject.Name, meta.oi.TransitionedObject.VersionID, meta.oi.TransitionedObject.Tier) if ignoreNotFoundErr(err) != nil { logger.LogIf(ctx, err) return @@ -1065,7 +1065,7 @@ func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, obj Ob func applyTransitionAction(ctx context.Context, action lifecycle.Action, objLayer ObjectLayer, obj ObjectInfo) bool { srcOpts := ObjectOptions{} - if obj.TransitionStatus == "" { + if obj.TransitionedObject.Status == "" { srcOpts.Versioned = globalBucketVersioningSys.Enabled(obj.Bucket) srcOpts.VersionID = obj.VersionID // mark transition as pending @@ -1137,7 +1137,7 @@ func applyExpiryOnNonTransitionedObjects(ctx context.Context, objLayer ObjectLay // Apply object, object version, restored object or restored object version action on the given object func applyExpiryRule(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo, restoredObject, applyOnVersion bool) bool { - if obj.TransitionStatus != "" { + if obj.TransitionedObject.Status != "" { return applyExpiryOnTransitionedObject(ctx, objLayer, obj, restoredObject) } return applyExpiryOnNonTransitionedObjects(ctx, objLayer, obj, applyOnVersion) diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index 444382fea..1bfff0e01 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -162,11 +162,13 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo { objInfo.ReplicationStatus = replication.StatusType(fi.DeleteMarkerReplicationStatus) } - objInfo.TransitionStatus = fi.TransitionStatus - objInfo.transitionedObjName = fi.TransitionedObjName - objInfo.transitionVersionID = fi.TransitionVersionID - objInfo.tierFreeVersion = fi.TierFreeVersion() - objInfo.TransitionTier = fi.TransitionTier + objInfo.TransitionedObject = TransitionedObject{ + Name: fi.TransitionedObjName, + VersionID: fi.TransitionVersionID, + Status: fi.TransitionStatus, + FreeVersion: fi.TierFreeVersion(), + Tier: fi.TransitionTier, + } // etag/md5Sum has already been extracted. We need to // remove to avoid it from appearing as part of diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index b72745763..98a154e55 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -273,12 +273,7 @@ type poolObjInfo struct { Err error } -// getPoolIdxExisting returns the (first) found object pool index containing an object. -// If the object exists, but the latest version is a delete marker, the index with it is still returned. -// If the object does not exist ObjectNotFound error is returned. -// If any other error is found, it is returned. -// The check is skipped if there is only one zone, and 0, nil is always returned in that case. -func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, object string) (idx int, err error) { +func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, bucket, object string, opts ObjectOptions) (idx int, err error) { if z.SinglePool() { return 0, nil } @@ -294,7 +289,7 @@ func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, obj pinfo := poolObjInfo{ PoolIndex: i, } - pinfo.ObjInfo, pinfo.Err = pool.GetObjectInfo(ctx, bucket, object, ObjectOptions{}) + pinfo.ObjInfo, pinfo.Err = pool.GetObjectInfo(ctx, bucket, object, opts) poolObjInfos[i] = pinfo }(i, pool) } @@ -331,6 +326,19 @@ func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, obj return -1, toObjectErr(errFileNotFound, bucket, object) } +func (z *erasureServerPools) getPoolIdxExistingNoLock(ctx context.Context, bucket, object string) (idx int, err error) { + return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{NoLock: true}) +} + +// getPoolIdxExisting returns the (first) found object pool index containing an object. +// If the object exists, but the latest version is a delete marker, the index with it is still returned. +// If the object does not exist ObjectNotFound error is returned. +// If any other error is found, it is returned. +// The check is skipped if there is only one zone, and 0, nil is always returned in that case. +func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, object string) (idx int, err error) { + return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{}) +} + // getPoolIdx returns the found previous object and its corresponding pool idx, // if none are found falls back to most available space pool. func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) { @@ -837,27 +845,6 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o objSets.Add(objects[i].ObjectName) } - poolObjIdxMap := map[int][]ObjectToDelete{} - origIndexMap := map[int][]int{} - if !z.SinglePool() { - for j, obj := range objects { - idx, err := z.getPoolIdxExisting(ctx, bucket, obj.ObjectName) - if isErrObjectNotFound(err) { - derrs[j] = err - continue - } - if err != nil { - // Unhandled errors return right here. - for i := range derrs { - derrs[i] = err - } - return dobjects, derrs - } - poolObjIdxMap[idx] = append(poolObjIdxMap[idx], obj) - origIndexMap[idx] = append(origIndexMap[idx], j) - } - } - // Acquire a bulk write lock across 'objects' multiDeleteLock := z.NewNSLock(bucket, objSets.ToSlice()...) lkctx, err := multiDeleteLock.GetLock(ctx, globalOperationTimeout) @@ -874,17 +861,65 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o return z.serverPools[0].DeleteObjects(ctx, bucket, objects, opts) } - for idx, pool := range z.serverPools { - objs := poolObjIdxMap[idx] - orgIndexes := origIndexMap[idx] - deletedObjects, errs := pool.DeleteObjects(ctx, bucket, objs, opts) - for i, derr := range errs { - if derr != nil { - derrs[orgIndexes[i]] = derr + // Fetch location of up to 10 objects concurrently. + poolObjIdxMap := map[int][]ObjectToDelete{} + origIndexMap := map[int][]int{} + + var mu sync.Mutex + eg := errgroup.WithNErrs(len(objects)).WithConcurrency(10) + cctx, cancel := eg.WithCancelOnError(ctx) + defer cancel() + for j, obj := range objects { + j := j + obj := obj + eg.Go(func() error { + idx, err := z.getPoolIdxExistingNoLock(cctx, bucket, obj.ObjectName) + if isErrObjectNotFound(err) { + derrs[j] = err + return nil } - dobjects[orgIndexes[i]] = deletedObjects[i] - } + if err != nil { + // unhandled errors return right here. + return err + } + mu.Lock() + poolObjIdxMap[idx] = append(poolObjIdxMap[idx], obj) + origIndexMap[idx] = append(origIndexMap[idx], j) + mu.Unlock() + return nil + }, j) } + + if err := eg.WaitErr(); err != nil { + for i := range derrs { + derrs[i] = err + } + return dobjects, derrs + } + + // Delete concurrently in all server pools. + var wg sync.WaitGroup + wg.Add(len(z.serverPools)) + for idx, pool := range z.serverPools { + go func(idx int, pool *erasureSets) { + defer wg.Done() + objs := poolObjIdxMap[idx] + if len(objs) > 0 { + orgIndexes := origIndexMap[idx] + deletedObjects, errs := pool.DeleteObjects(ctx, bucket, objs, opts) + mu.Lock() + for i, derr := range errs { + if derr != nil { + derrs[orgIndexes[i]] = derr + } + dobjects[orgIndexes[i]] = deletedObjects[i] + } + mu.Unlock() + } + }(idx, pool) + } + wg.Wait() + return dobjects, derrs } diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 6a0ede23b..03779211e 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -113,16 +113,8 @@ type ObjectInfo struct { // to a delete marker on an object. DeleteMarker bool - // tierFreeVersion is true if this is a free-version - tierFreeVersion bool - // TransitionStatus indicates if transition is complete/pending - TransitionStatus string - // Name of transitioned object on remote tier - transitionedObjName string - // VersionID on the the remote tier - transitionVersionID string - // Name of remote tier object has transitioned to - TransitionTier string + // Transitioned object information + TransitionedObject TransitionedObject // RestoreExpires indicates date a restored object expires RestoreExpires time.Time @@ -200,7 +192,7 @@ func (o ObjectInfo) Clone() (cinfo ObjectInfo) { VersionID: o.VersionID, IsLatest: o.IsLatest, DeleteMarker: o.DeleteMarker, - TransitionStatus: o.TransitionStatus, + TransitionedObject: o.TransitionedObject, RestoreExpires: o.RestoreExpires, RestoreOngoing: o.RestoreOngoing, ContentType: o.ContentType, @@ -354,6 +346,15 @@ type ListMultipartsInfo struct { EncodingType string // Not supported yet. } +// TransitionedObject transitioned object tier and status. +type TransitionedObject struct { + Name string + VersionID string + Tier string + FreeVersion bool + Status string +} + // DeletedObjectInfo - container for list objects versions deleted objects. type DeletedObjectInfo struct { // Name of the bucket. diff --git a/cmd/object-api-options.go b/cmd/object-api-options.go index 3f71f0156..26cf230fd 100644 --- a/cmd/object-api-options.go +++ b/cmd/object-api-options.go @@ -219,6 +219,7 @@ func delOpts(ctx context.Context, r *http.Request, bucket, object string) (opts // get ObjectOptions for PUT calls from encryption headers and metadata func putOpts(ctx context.Context, r *http.Request, bucket, object string, metadata map[string]string) (opts ObjectOptions, err error) { versioned := globalBucketVersioningSys.Enabled(bucket) + versionSuspended := globalBucketVersioningSys.Suspended(bucket) vid := strings.TrimSpace(r.Form.Get(xhttp.VersionID)) if vid != "" && vid != nullVersionID { _, err := uuid.Parse(vid) @@ -266,6 +267,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada UserDefined: metadata, VersionID: vid, Versioned: versioned, + VersionSuspended: versionSuspended, MTime: mtime, }, nil } @@ -273,6 +275,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada opts, err = getOpts(ctx, r, bucket, object) opts.VersionID = vid opts.Versioned = versioned + opts.VersionSuspended = versionSuspended opts.UserDefined = metadata return } @@ -290,6 +293,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada UserDefined: metadata, VersionID: vid, Versioned: versioned, + VersionSuspended: versionSuspended, MTime: mtime, }, nil } @@ -300,6 +304,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada } opts.VersionID = vid opts.Versioned = versioned + opts.VersionSuspended = versionSuspended opts.MTime = mtime return opts, nil } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 2fc524d6c..122f0b911 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -1372,12 +1372,14 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re objInfo.ModTime = remoteObjInfo.LastModified } else { - os := newObjSweeper(dstBucket, dstObject) + os := newObjSweeper(dstBucket, dstObject).WithVersioning(dstOpts.Versioned, dstOpts.VersionSuspended) // Get appropriate object info to identify the remote object to delete if !srcInfo.metadataOnly { goiOpts := os.GetOpts() - if goi, gerr := getObjectInfo(ctx, dstBucket, dstObject, goiOpts); gerr == nil { - os.SetTransitionState(goi) + if !globalTierConfigMgr.Empty() { + if goi, gerr := getObjectInfo(ctx, dstBucket, dstObject, goiOpts); gerr == nil { + os.SetTransitionState(goi.TransitionedObject) + } } } @@ -1395,7 +1397,9 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re } // Remove the transitioned object whose object version is being overwritten. - defer logger.LogIf(ctx, os.Sweep()) + if !globalTierConfigMgr.Empty() { + logger.LogIf(ctx, os.Sweep()) + } } objInfo.ETag = getDecryptedETag(r.Header, objInfo, false) @@ -1685,11 +1689,13 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req // Ensure that metadata does not contain sensitive information crypto.RemoveSensitiveEntries(metadata) - os := newObjSweeper(bucket, object) - // Get appropriate object info to identify the remote object to delete - goiOpts := os.GetOpts() - if goi, gerr := getObjectInfo(ctx, bucket, object, goiOpts); gerr == nil { - os.SetTransitionState(goi) + os := newObjSweeper(bucket, object).WithVersioning(opts.Versioned, opts.VersionSuspended) + if !globalTierConfigMgr.Empty() { + // Get appropriate object info to identify the remote object to delete + goiOpts := os.GetOpts() + if goi, gerr := getObjectInfo(ctx, bucket, object, goiOpts); gerr == nil { + os.SetTransitionState(goi.TransitionedObject) + } } // Create the object.. @@ -1752,7 +1758,9 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req }) // Remove the transitioned object whose object version is being overwritten. - logger.LogIf(ctx, os.Sweep()) + if !globalTierConfigMgr.Empty() { + logger.LogIf(ctx, os.Sweep()) + } } // PutObjectExtractHandler - PUT Object extract is an extended API @@ -3061,11 +3069,15 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite w.(http.Flusher).Flush() } - os := newObjSweeper(bucket, object) - // Get appropriate object info to identify the remote object to delete - goiOpts := os.GetOpts() - if goi, gerr := objectAPI.GetObjectInfo(ctx, bucket, object, goiOpts); gerr == nil { - os.SetTransitionState(goi) + versioned := globalBucketVersioningSys.Enabled(bucket) + suspended := globalBucketVersioningSys.Suspended(bucket) + os := newObjSweeper(bucket, object).WithVersioning(versioned, suspended) + if !globalTierConfigMgr.Empty() { + // Get appropriate object info to identify the remote object to delete + goiOpts := os.GetOpts() + if goi, gerr := objectAPI.GetObjectInfo(ctx, bucket, object, goiOpts); gerr == nil { + os.SetTransitionState(goi.TransitionedObject) + } } setEventStreamHeaders(w) @@ -3134,7 +3146,9 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite }) // Remove the transitioned object whose object version is being overwritten. - logger.LogIf(ctx, os.Sweep()) + if !globalTierConfigMgr.Empty() { + logger.LogIf(ctx, os.Sweep()) + } } /// Delete objectAPIHandlers @@ -3164,11 +3178,6 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. return } - getObjectInfo := objectAPI.GetObjectInfo - if api.CacheAPI() != nil { - getObjectInfo = api.CacheAPI().GetObjectInfo - } - if globalDNSConfig != nil { _, err := globalDNSConfig.Get(bucket) if err != nil && err != dns.ErrNotImplemented { @@ -3187,16 +3196,20 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. gerr error ) - var goiOpts ObjectOptions - os := newObjSweeper(bucket, object).WithVersion(singleDelete(*r)) + getObjectInfo := objectAPI.GetObjectInfo + if api.CacheAPI() != nil { + getObjectInfo = api.CacheAPI().GetObjectInfo + } + + os := newObjSweeper(bucket, object).WithVersion(opts.VersionID).WithVersioning(opts.Versioned, opts.VersionSuspended) // Mutations of objects on versioning suspended buckets // affect its null version. Through opts below we select // the null version's remote object to delete if // transitioned. - goiOpts = os.GetOpts() + goiOpts := os.GetOpts() goi, gerr = getObjectInfo(ctx, bucket, object, goiOpts) if gerr == nil { - os.SetTransitionState(goi) + os.SetTransitionState(goi.TransitionedObject) } replicateDel, replicateSync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr) @@ -3309,8 +3322,9 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. } // Remove the transitioned object whose object version is being overwritten. - logger.LogIf(ctx, os.Sweep()) - + if !globalTierConfigMgr.Empty() { + os.Sweep() + } } // PutObjectLegalHoldHandler - set legal hold configuration to object, @@ -3907,13 +3921,14 @@ func (api objectAPIHandlers) PostRestoreObjectHandler(w http.ResponseWriter, r * writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return } + objInfo, err := getObjectInfo(ctx, bucket, object, opts) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return } - if objInfo.TransitionStatus != lifecycle.TransitionComplete { + if objInfo.TransitionedObject.Status != lifecycle.TransitionComplete { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidObjectState), r.URL) return } diff --git a/cmd/tier-sweeper.go b/cmd/tier-sweeper.go index 24cbcedda..8553fc205 100644 --- a/cmd/tier-sweeper.go +++ b/cmd/tier-sweeper.go @@ -18,11 +18,7 @@ package cmd import ( - "net/http" - "strings" - "github.com/minio/minio/internal/bucket/lifecycle" - xhttp "github.com/minio/minio/internal/http" ) // objSweeper determines if a transitioned object needs to be removed from the remote tier. @@ -43,7 +39,7 @@ import ( type objSweeper struct { Object string Bucket string - ReqVersion string // version ID set by application, applies only to DeleteObject and DeleteObjects APIs + VersionID string // version ID set by application, applies only to DeleteObject and DeleteObjects APIs Versioned bool Suspended bool TransitionStatus string @@ -55,46 +51,22 @@ type objSweeper struct { // newObjSweeper returns an objSweeper for a given bucket and object. // It initializes the versioning information using bucket name. func newObjSweeper(bucket, object string) *objSweeper { - versioned := globalBucketVersioningSys.Enabled(bucket) - suspended := globalBucketVersioningSys.Suspended(bucket) return &objSweeper{ - Object: object, - Bucket: bucket, - Versioned: versioned, - Suspended: suspended, + Object: object, + Bucket: bucket, } } -// versionIDer interface is used to fetch object versionIDer from disparate sources -// like http.Request and ObjectToDelete. -type versionIDer interface { - GetVersionID() string -} - -// multiDelete is a type alias for ObjectToDelete to implement versionID -// interface -type multiDelete ObjectToDelete - -// GetVersionID returns object version of an object to be deleted via -// multi-delete API. -func (md multiDelete) GetVersionID() string { - return md.VersionID -} - -// singleDelete is a type alias for http.Request to implement versionID -// interface -type singleDelete http.Request - -// GetVersionID returns object version of an object to be deleted via (simple) -// delete API. Note only when the versionID is set explicitly by the application -// will we return a non-empty versionID. -func (sd singleDelete) GetVersionID() string { - return strings.TrimSpace(sd.URL.Query().Get(xhttp.VersionID)) -} - // WithVersion sets the version ID from v -func (os *objSweeper) WithVersion(v versionIDer) *objSweeper { - os.ReqVersion = v.GetVersionID() +func (os *objSweeper) WithVersion(vid string) *objSweeper { + os.VersionID = vid + return os +} + +// WithVersioning sets bucket versioning for sweeper. +func (os *objSweeper) WithVersioning(versioned, suspended bool) *objSweeper { + os.Versioned = versioned + os.Suspended = suspended return os } @@ -102,22 +74,22 @@ func (os *objSweeper) WithVersion(v versionIDer) *objSweeper { // overwritten or deleted depending on bucket versioning status. func (os *objSweeper) GetOpts() ObjectOptions { opts := ObjectOptions{ - VersionID: os.ReqVersion, + VersionID: os.VersionID, Versioned: os.Versioned, VersionSuspended: os.Suspended, } - if os.Suspended && os.ReqVersion == "" { + if os.Suspended && os.VersionID == "" { opts.VersionID = nullVersionID } return opts } // SetTransitionState sets ILM transition related information from given info. -func (os *objSweeper) SetTransitionState(info ObjectInfo) { - os.TransitionTier = info.TransitionTier - os.TransitionStatus = info.TransitionStatus - os.RemoteObject = info.transitionedObjName - os.TransitionVersionID = info.transitionVersionID +func (os *objSweeper) SetTransitionState(info TransitionedObject) { + os.TransitionTier = info.Tier + os.TransitionStatus = info.Status + os.RemoteObject = info.Name + os.TransitionVersionID = info.VersionID } // shouldRemoveRemoteObject determines if a transitioned object should be @@ -140,7 +112,7 @@ func (os *objSweeper) shouldRemoveRemoteObject() (jentry, bool) { switch { case !os.Versioned, os.Suspended: // 1, 2.a, 2.b delTier = true - case os.Versioned && os.ReqVersion != "": // 3.a + case os.Versioned && os.VersionID != "": // 3.a delTier = true } if delTier { diff --git a/cmd/tier.go b/cmd/tier.go index da037d1cb..7891e8407 100644 --- a/cmd/tier.go +++ b/cmd/tier.go @@ -115,6 +115,11 @@ func (config *TierConfigMgr) Add(ctx context.Context, tier madmin.TierConfig) er return nil } +// Empty returns if tier targets are empty +func (config *TierConfigMgr) Empty() bool { + return len(config.ListTiers()) == 0 +} + // ListTiers lists remote tiers configured in this deployment. func (config *TierConfigMgr) ListTiers() []madmin.TierConfig { config.RLock()