do not count ILM expired objects and other skipped objects (#17184)

This commit is contained in:
Harshavardhana 2023-05-11 13:35:16 -07:00 committed by GitHub
parent 77db9686fb
commit 3637aad36e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -778,14 +778,6 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
continue
}
// We will skip decommissioning delete markers
// with single version, its as good as there
// is no data associated with the object.
if version.Deleted && len(fivs.Versions) == 1 {
logger.LogIf(ctx, fmt.Errorf("found %s/%s delete marked object with no other versions, skipping since there is no content left", bi.Name, version.Name))
continue
}
if version.Deleted {
_, err := z.DeleteObject(ctx,
bi.Name,
@ -836,6 +828,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
failure = false
break
}
stopFn := globalDecommissionMetrics.log(decomMetricDecommissionObject, idx, bi.Name, version.Name, version.VersionID)
gr, err := set.GetObjectNInfo(ctx,
bi.Name,
encodeDirObject(version.Name),
@ -849,14 +842,24 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
// object deleted by the application, nothing to do here we move on.
ignore = true
stopFn(nil)
break
}
if err != nil && !ignore {
// if usage-cache.bin is not readable log and ignore it.
if bi.Name == minioMetaBucket && strings.Contains(version.Name, dataUsageCacheName) {
ignore = true
stopFn(err)
logger.LogIf(ctx, err)
break
}
}
if err != nil {
failure = true
logger.LogIf(ctx, err)
stopFn(err)
continue
}
stopFn := globalDecommissionMetrics.log(decomMetricDecommissionObject, idx, bi.Name, version.Name, version.VersionID)
if err = z.decommissionObject(ctx, bi.Name, gr); err != nil {
stopFn(err)
failure = true
@ -1003,17 +1006,70 @@ func (z *erasureServerPools) checkAfterDecom(ctx context.Context, idx int) error
pool := z.serverPools[idx]
for _, set := range pool.sets {
for _, bi := range buckets {
var objectsFound int
vc, _ := globalBucketVersioningSys.Get(bi.Name)
// Check if the current bucket has a configured lifecycle policy
lc, _ := globalLifecycleSys.Get(bi.Name)
// Check if bucket is object locked.
lr, _ := globalBucketObjectLockSys.Get(bi.Name)
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
if lc == nil {
return false
}
versioned := vc != nil && vc.Versioned(object)
objInfo := fi.ToObjectInfo(bucket, object, versioned)
evt := evalActionFromLifecycle(ctx, *lc, lr, objInfo)
switch {
case evt.Action.DeleteRestored(): // if restored copy has expired,delete it synchronously
applyExpiryOnTransitionedObject(ctx, z, objInfo, evt)
return false
case evt.Action.Delete():
globalExpiryState.enqueueByDays(objInfo, evt)
return true
default:
return false
}
}
var versionsFound int
err := set.listObjectsToDecommission(ctx, bi, func(entry metaCacheEntry) {
if entry.isObject() {
objectsFound++
if !entry.isObject() {
return
}
fivs, err := entry.fileInfoVersions(bi.Name)
if err != nil {
return
}
// We need a reversed order for decommissioning,
// to create the appropriate stack.
versionsSorter(fivs.Versions).reverse()
for _, version := range fivs.Versions {
// Apply lifecycle rules on the objects that are expired.
if filterLifecycle(bi.Name, version.Name, version) {
continue
}
// `.usage-cache.bin` still exists, must be not readable ignore it.
if bi.Name == minioMetaBucket && strings.Contains(version.Name, dataUsageCacheName) {
// skipping bucket usage cache name, as its autogenerated.
continue
}
versionsFound++
}
})
if err != nil {
return err
}
if objectsFound > 0 {
return fmt.Errorf("at least %d objects were found in bucket `%s` after decommissioning", objectsFound, bi.Name)
if versionsFound > 0 {
return fmt.Errorf("at least %d object(s)/version(s) were found in bucket `%s` after decommissioning", versionsFound, bi.Name)
}
}
}