mirror of
https://github.com/minio/minio.git
synced 2025-11-22 02:35:30 -05:00
Add support for batch job cancellation (#16843)
This commit is contained in:
@@ -234,6 +234,7 @@ type BatchJobRequest struct {
|
||||
Started time.Time `yaml:"-" json:"started"`
|
||||
Location string `yaml:"-" json:"location"`
|
||||
Replicate *BatchJobReplicateV1 `yaml:"replicate" json:"replicate"`
|
||||
ctx context.Context `msg:"-"`
|
||||
}
|
||||
|
||||
// Notify notifies notification endpoint if configured regarding job failure or success.
|
||||
@@ -859,9 +860,7 @@ func (j BatchJobRequest) Validate(ctx context.Context, o ObjectLayer) error {
|
||||
}
|
||||
|
||||
func (j BatchJobRequest) delete(ctx context.Context, api ObjectLayer) {
|
||||
if j.Replicate != nil {
|
||||
deleteConfig(ctx, api, pathJoin(j.Location, batchReplName))
|
||||
}
|
||||
deleteConfig(ctx, api, pathJoin(j.Location, batchReplName))
|
||||
globalBatchJobsMetrics.delete(j.ID)
|
||||
deleteConfig(ctx, api, j.Location)
|
||||
}
|
||||
@@ -1061,6 +1060,34 @@ func (a adminAPIHandlers) StartBatchJob(w http.ResponseWriter, r *http.Request)
|
||||
writeSuccessResponseJSON(w, buf)
|
||||
}
|
||||
|
||||
// CancelBatchJob cancels a job in progress
|
||||
func (a adminAPIHandlers) CancelBatchJob(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "CancelBatchJob")
|
||||
|
||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||
|
||||
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.CancelBatchJobAction)
|
||||
if objectAPI == nil {
|
||||
return
|
||||
}
|
||||
jobID := r.Form.Get("id")
|
||||
if jobID == "" {
|
||||
writeErrorResponseJSON(ctx, w, toAPIError(ctx, errInvalidArgument), r.URL)
|
||||
return
|
||||
}
|
||||
if err := globalBatchJobPool.canceler(jobID, true); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrInvalidRequest, err), r.URL)
|
||||
return
|
||||
}
|
||||
j := BatchJobRequest{
|
||||
ID: jobID,
|
||||
Location: pathJoin(batchJobPrefix, jobID),
|
||||
}
|
||||
j.delete(ctx, objectAPI)
|
||||
|
||||
writeSuccessNoContent(w)
|
||||
}
|
||||
|
||||
//msgp:ignore BatchJobPool
|
||||
|
||||
// BatchJobPool batch job pool
|
||||
@@ -1070,6 +1097,8 @@ type BatchJobPool struct {
|
||||
once sync.Once
|
||||
mu sync.Mutex
|
||||
jobCh chan *BatchJobRequest
|
||||
jmu sync.Mutex // protects jobCancelers
|
||||
jobCancelers map[string]context.CancelFunc
|
||||
workerKillCh chan struct{}
|
||||
workerSize int
|
||||
}
|
||||
@@ -1083,6 +1112,7 @@ func newBatchJobPool(ctx context.Context, o ObjectLayer, workers int) *BatchJobP
|
||||
objLayer: o,
|
||||
jobCh: make(chan *BatchJobRequest, 10000),
|
||||
workerKillCh: make(chan struct{}, workers),
|
||||
jobCancelers: make(map[string]context.CancelFunc),
|
||||
}
|
||||
jpool.ResizeWorkers(workers)
|
||||
jpool.resume()
|
||||
@@ -1124,15 +1154,17 @@ func (j *BatchJobPool) AddWorker() {
|
||||
return
|
||||
}
|
||||
if job.Replicate != nil {
|
||||
if err := job.Replicate.Start(j.ctx, j.objLayer, *job); err != nil {
|
||||
if err := job.Replicate.Start(job.ctx, j.objLayer, *job); err != nil {
|
||||
if !isErrBucketNotFound(err) {
|
||||
logger.LogIf(j.ctx, err)
|
||||
j.canceler(job.ID, false)
|
||||
continue
|
||||
}
|
||||
// Bucket not found proceed to delete such a job.
|
||||
}
|
||||
}
|
||||
job.delete(j.ctx, j.objLayer)
|
||||
j.canceler(job.ID, false)
|
||||
case <-j.workerKillCh:
|
||||
return
|
||||
}
|
||||
@@ -1162,6 +1194,12 @@ func (j *BatchJobPool) queueJob(req *BatchJobRequest) error {
|
||||
if j == nil {
|
||||
return errInvalidArgument
|
||||
}
|
||||
jctx, jcancel := context.WithCancel(j.ctx)
|
||||
j.jmu.Lock()
|
||||
j.jobCancelers[req.ID] = jcancel
|
||||
j.jmu.Unlock()
|
||||
req.ctx = jctx
|
||||
|
||||
select {
|
||||
case <-j.ctx.Done():
|
||||
j.once.Do(func() {
|
||||
@@ -1174,6 +1212,23 @@ func (j *BatchJobPool) queueJob(req *BatchJobRequest) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// delete canceler from the map, cancel job if requested
|
||||
func (j *BatchJobPool) canceler(jobID string, cancel bool) error {
|
||||
if j == nil {
|
||||
return errInvalidArgument
|
||||
}
|
||||
j.jmu.Lock()
|
||||
defer j.jmu.Unlock()
|
||||
if canceler, ok := j.jobCancelers[jobID]; ok {
|
||||
if cancel {
|
||||
canceler()
|
||||
|
||||
}
|
||||
}
|
||||
delete(j.jobCancelers, jobID)
|
||||
return nil
|
||||
}
|
||||
|
||||
//msgp:ignore batchJobMetrics
|
||||
type batchJobMetrics struct {
|
||||
sync.RWMutex
|
||||
|
||||
Reference in New Issue
Block a user