fix: add additional decommission traces for ILM expired content (#17522)

current decommission traces were missing for

- Skipped ILM expired versions
- Skipped single DELETE marked version
- A success or failure in decommissioning DELETE marker
- allow additional info to be shared in DecomStatus() API
This commit is contained in:
Harshavardhana 2023-06-27 11:59:40 -07:00 committed by GitHub
parent 1818764840
commit d2f5c3621f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -62,10 +62,10 @@ type PoolDecommissionInfo struct {
Object string `json:"-" msg:"obj"`
// Verbose information
ItemsDecommissioned int64 `json:"-" msg:"id"`
ItemsDecommissionFailed int64 `json:"-" msg:"idf"`
BytesDone int64 `json:"-" msg:"bd"`
BytesFailed int64 `json:"-" msg:"bf"`
ItemsDecommissioned int64 `json:"objectsDecommissioned" msg:"id"`
ItemsDecommissionFailed int64 `json:"objectsDecommissionedFailed" msg:"idf"`
BytesDone int64 `json:"bytesDecommissioned" msg:"bd"`
BytesFailed int64 `json:"bytesDecommissionedFailed" msg:"bf"`
}
// bucketPop should be called when a bucket is done decommissioning.
@ -736,10 +736,12 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
var decommissioned, expired int
for _, version := range fivs.Versions {
stopFn := globalDecommissionMetrics.log(decomMetricDecommissionObject, idx, bi.Name, version.Name, version.VersionID)
// Apply lifecycle rules on the objects that are expired.
if filterLifecycle(bi.Name, version.Name, version) {
expired++
decommissioned++
stopFn(errors.New("ILM expired object/version will be skipped"))
continue
}
@ -749,6 +751,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
remainingVersions := len(fivs.Versions) - expired
if version.Deleted && remainingVersions == 1 {
decommissioned++
stopFn(errors.New("DELETE marked object with no other non-current versions will be skipped"))
continue
}
@ -773,7 +776,13 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
SkipDecommissioned: true, // make sure we skip the decommissioned pool
})
var failure bool
if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
if err != nil {
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
err = nil
}
}
stopFn(err)
if err != nil {
logger.LogIf(ctx, err)
failure = true
}
@ -791,7 +800,6 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
// gr.Close() is ensured by decommissionObject().
for try := 0; try < 3; try++ {
if version.IsRemote() {
stopFn := globalDecommissionMetrics.log(decomMetricDecommissionObject, idx, bi.Name, version.Name, version.VersionID)
if err := z.DecomTieredObject(ctx, bi.Name, version.Name, version, ObjectOptions{
VersionID: versionID,
MTime: version.ModTime,
@ -806,7 +814,6 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
failure = false
break
}
stopFn := globalDecommissionMetrics.log(decomMetricDecommissionObject, idx, bi.Name, version.Name, version.VersionID)
gr, err := set.GetObjectNInfo(ctx,
bi.Name,
encodeDirObject(version.Name),