mirror of
https://github.com/minio/minio.git
synced 2025-04-22 11:26:36 -04:00
fix: batch status reporting after complete (#17852)
batch status can perpetually wait after completion due to a race between the MetricsHandler() returning the active metrics in intervals of 1sec and delete of metrics after job completion. this PR ensures that we keep the 'status' around for a while, i.e upto 24hrs for all the batch jobs.
This commit is contained in:
parent
c4ca0a5a57
commit
3ba927edae
@ -1413,7 +1413,6 @@ func (j BatchJobRequest) delete(ctx context.Context, api ObjectLayer) {
|
|||||||
case j.KeyRotate != nil:
|
case j.KeyRotate != nil:
|
||||||
deleteConfig(ctx, api, pathJoin(j.Location, batchKeyRotationName))
|
deleteConfig(ctx, api, pathJoin(j.Location, batchKeyRotationName))
|
||||||
}
|
}
|
||||||
globalBatchJobsMetrics.delete(j.ID)
|
|
||||||
deleteConfig(ctx, api, j.Location)
|
deleteConfig(ctx, api, j.Location)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1815,10 +1814,6 @@ type batchJobMetrics struct {
|
|||||||
metrics map[string]*batchJobInfo
|
metrics map[string]*batchJobInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
var globalBatchJobsMetrics = batchJobMetrics{
|
|
||||||
metrics: make(map[string]*batchJobInfo),
|
|
||||||
}
|
|
||||||
|
|
||||||
//msgp:ignore batchJobMetric
|
//msgp:ignore batchJobMetric
|
||||||
//go:generate stringer -type=batchJobMetric -trimprefix=batchJobMetric $GOFILE
|
//go:generate stringer -type=batchJobMetric -trimprefix=batchJobMetric $GOFILE
|
||||||
type batchJobMetric uint8
|
type batchJobMetric uint8
|
||||||
@ -1858,9 +1853,17 @@ func (m *batchJobMetrics) report(jobID string) (metrics *madmin.BatchJobMetrics)
|
|||||||
metrics = &madmin.BatchJobMetrics{CollectedAt: time.Now(), Jobs: make(map[string]madmin.JobMetric)}
|
metrics = &madmin.BatchJobMetrics{CollectedAt: time.Now(), Jobs: make(map[string]madmin.JobMetric)}
|
||||||
m.RLock()
|
m.RLock()
|
||||||
defer m.RUnlock()
|
defer m.RUnlock()
|
||||||
|
|
||||||
|
match := true
|
||||||
for id, job := range m.metrics {
|
for id, job := range m.metrics {
|
||||||
match := jobID != "" && id == jobID
|
if jobID != "" {
|
||||||
metrics.Jobs[id] = madmin.JobMetric{
|
match = id == jobID
|
||||||
|
}
|
||||||
|
if !match {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
m := madmin.JobMetric{
|
||||||
JobID: job.JobID,
|
JobID: job.JobID,
|
||||||
JobType: job.JobType,
|
JobType: job.JobType,
|
||||||
StartTime: job.StartTime,
|
StartTime: job.StartTime,
|
||||||
@ -1868,28 +1871,58 @@ func (m *batchJobMetrics) report(jobID string) (metrics *madmin.BatchJobMetrics)
|
|||||||
RetryAttempts: job.RetryAttempts,
|
RetryAttempts: job.RetryAttempts,
|
||||||
Complete: job.Complete,
|
Complete: job.Complete,
|
||||||
Failed: job.Failed,
|
Failed: job.Failed,
|
||||||
Replicate: &madmin.ReplicateInfo{
|
}
|
||||||
|
|
||||||
|
switch job.JobType {
|
||||||
|
case string(madmin.BatchJobReplicate):
|
||||||
|
m.Replicate = &madmin.ReplicateInfo{
|
||||||
Bucket: job.Bucket,
|
Bucket: job.Bucket,
|
||||||
Object: job.Object,
|
Object: job.Object,
|
||||||
Objects: job.Objects,
|
Objects: job.Objects,
|
||||||
ObjectsFailed: job.ObjectsFailed,
|
ObjectsFailed: job.ObjectsFailed,
|
||||||
BytesTransferred: job.BytesTransferred,
|
BytesTransferred: job.BytesTransferred,
|
||||||
BytesFailed: job.BytesFailed,
|
BytesFailed: job.BytesFailed,
|
||||||
},
|
}
|
||||||
KeyRotate: &madmin.KeyRotationInfo{
|
case string(madmin.BatchJobKeyRotate):
|
||||||
|
m.KeyRotate = &madmin.KeyRotationInfo{
|
||||||
Bucket: job.Bucket,
|
Bucket: job.Bucket,
|
||||||
Object: job.Object,
|
Object: job.Object,
|
||||||
Objects: job.Objects,
|
Objects: job.Objects,
|
||||||
ObjectsFailed: job.ObjectsFailed,
|
ObjectsFailed: job.ObjectsFailed,
|
||||||
},
|
}
|
||||||
}
|
|
||||||
if match {
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics.Jobs[id] = m
|
||||||
}
|
}
|
||||||
return metrics
|
return metrics
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// keep job metrics for some time after the job is completed
|
||||||
|
// in-case some one wants to look at the older results.
|
||||||
|
func (m *batchJobMetrics) purgeJobMetrics() {
|
||||||
|
t := time.NewTicker(6 * time.Hour)
|
||||||
|
defer t.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-GlobalContext.Done():
|
||||||
|
return
|
||||||
|
case <-t.C:
|
||||||
|
var toDeleteJobMetrics []string
|
||||||
|
m.RLock()
|
||||||
|
for id, metrics := range m.metrics {
|
||||||
|
if time.Since(metrics.LastUpdate) > 24*time.Hour && (metrics.Complete || metrics.Failed) {
|
||||||
|
toDeleteJobMetrics = append(toDeleteJobMetrics, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m.RUnlock()
|
||||||
|
for _, jobID := range toDeleteJobMetrics {
|
||||||
|
m.delete(jobID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (m *batchJobMetrics) delete(jobID string) {
|
func (m *batchJobMetrics) delete(jobID string) {
|
||||||
m.Lock()
|
m.Lock()
|
||||||
defer m.Unlock()
|
defer m.Unlock()
|
||||||
|
@ -95,6 +95,9 @@ func init() {
|
|||||||
|
|
||||||
initGlobalContext()
|
initGlobalContext()
|
||||||
|
|
||||||
|
globalBatchJobsMetrics = batchJobMetrics{metrics: make(map[string]*batchJobInfo)}
|
||||||
|
go globalBatchJobsMetrics.purgeJobMetrics()
|
||||||
|
|
||||||
t, _ := minioVersionToReleaseTime(Version)
|
t, _ := minioVersionToReleaseTime(Version)
|
||||||
if !t.IsZero() {
|
if !t.IsZero() {
|
||||||
globalVersionUnix = uint64(t.Unix())
|
globalVersionUnix = uint64(t.Unix())
|
||||||
|
@ -399,6 +399,9 @@ var (
|
|||||||
|
|
||||||
// Set last client perf extra time (get lock, and validate)
|
// Set last client perf extra time (get lock, and validate)
|
||||||
globalLastClientPerfExtraTime int64
|
globalLastClientPerfExtraTime int64
|
||||||
|
|
||||||
|
// Captures all batch jobs metrics globally
|
||||||
|
globalBatchJobsMetrics batchJobMetrics
|
||||||
// Add new variable global values here.
|
// Add new variable global values here.
|
||||||
)
|
)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user