Add trace support for decommissioning (#15502)

* Add trace support for decommissioning
* Add support for tracing errors during decommission
This commit is contained in:
Krishnan Parthasarathi
2022-08-10 12:46:45 -07:00
committed by GitHub
parent b940fe8fca
commit 91e6af4470
4 changed files with 75 additions and 3 deletions

View File

@@ -800,11 +800,14 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
logger.LogIf(ctx, err)
continue
}
stopFn := globalDecommissionMetrics.log(decomMetricDecommissionObject, idx, bi.Name, version.Name, version.VersionID)
if err = z.decommissionObject(ctx, bi.Name, gr); err != nil {
stopFn(err)
failure = true
logger.LogIf(ctx, err)
continue
}
stopFn(nil)
failure = false
break
}
@@ -819,6 +822,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
// if all versions were decommissioned, then we can delete the object versions.
if decommissionedCount == len(fivs.Versions) {
stopFn := globalDecommissionMetrics.log(decomMetricDecommissionRemoveObject, idx, bi.Name, entry.name)
_, err := set.DeleteObject(ctx,
bi.Name,
encodeDirObject(entry.name),
@@ -826,6 +830,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
DeletePrefix: true, // use prefix delete to delete all versions at once.
},
)
stopFn(err)
auditLogDecom(ctx, "DecomDeleteObject", bi.Name, entry.name, "", err)
if err != nil {
logger.LogIf(ctx, err)
@@ -881,6 +886,47 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
return nil
}
//msgp:ignore decomMetrics
type decomMetrics struct{}
var globalDecommissionMetrics decomMetrics
//msgp:ignore decomMetric
//go:generate stringer -type=decomMetric -trimprefix=decomMetric $GOFILE
type decomMetric uint8
const (
decomMetricDecommissionBucket decomMetric = iota
decomMetricDecommissionObject
decomMetricDecommissionRemoveObject
)
func decomTrace(d decomMetric, poolIdx int, startTime time.Time, duration time.Duration, path string, err error) madmin.TraceInfo {
var errStr string
if err != nil {
errStr = err.Error()
}
return madmin.TraceInfo{
TraceType: madmin.TraceDecommission,
Time: startTime,
NodeName: globalLocalNodeName,
FuncName: fmt.Sprintf("decommission.%s (pool-id=%d)", d.String(), poolIdx),
Duration: duration,
Path: path,
Error: errStr,
}
}
func (m *decomMetrics) log(d decomMetric, poolIdx int, paths ...string) func(err error) {
startTime := time.Now()
return func(err error) {
duration := time.Since(startTime)
if globalTrace.NumSubscribers(madmin.TraceDecommission) > 0 {
globalTrace.Publish(decomTrace(d, poolIdx, startTime, duration, strings.Join(paths, " "), err))
}
}
}
func (z *erasureServerPools) decommissionInBackground(ctx context.Context, idx int) error {
pool := z.serverPools[idx]
for _, bucket := range z.poolMeta.PendingBuckets(idx) {
@@ -898,9 +944,13 @@ func (z *erasureServerPools) decommissionInBackground(ctx context.Context, idx i
if serverDebugLog {
console.Debugln("decommission: currently on bucket", bucket.Name)
}
stopFn := globalDecommissionMetrics.log(decomMetricDecommissionBucket, idx, bucket.Name)
if err := z.decommissionPool(ctx, idx, pool, bucket); err != nil {
stopFn(err)
return err
}
stopFn(nil)
z.poolMetaMutex.Lock()
z.poolMeta.BucketDone(idx, bucket)
z.poolMeta.save(ctx, z.serverPools)