fix: add proxyByNode for batch jobs as part of their jobId (#17844)

This commit is contained in:
Harshavardhana 2023-08-11 13:12:35 -07:00 committed by GitHub
parent 5f56f441bf
commit b760137e1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 23 additions and 9 deletions

View File

@ -1025,7 +1025,7 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
if exists && !nh.hasEnded() && len(nh.currentStatus.Items) > 0 { if exists && !nh.hasEnded() && len(nh.currentStatus.Items) > 0 {
clientToken := nh.clientToken clientToken := nh.clientToken
if globalIsDistErasure { if globalIsDistErasure {
clientToken = fmt.Sprintf("%s@%d", nh.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints)) clientToken = fmt.Sprintf("%s:%d", nh.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints))
} }
b, err := json.Marshal(madmin.HealStartSuccess{ b, err := json.Marshal(madmin.HealStartSuccess{
ClientToken: clientToken, ClientToken: clientToken,

View File

@ -253,7 +253,7 @@ func (ahs *allHealState) stopHealSequence(path string) ([]byte, APIError) {
} else { } else {
clientToken := he.clientToken clientToken := he.clientToken
if globalIsDistErasure { if globalIsDistErasure {
clientToken = fmt.Sprintf("%s@%d", he.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints)) clientToken = fmt.Sprintf("%s:%d", he.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints))
} }
hsp = madmin.HealStopSuccess{ hsp = madmin.HealStopSuccess{
@ -327,7 +327,7 @@ func (ahs *allHealState) LaunchNewHealSequence(h *healSequence, objAPI ObjectLay
clientToken := h.clientToken clientToken := h.clientToken
if globalIsDistErasure { if globalIsDistErasure {
clientToken = fmt.Sprintf("%s@%d", h.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints)) clientToken = fmt.Sprintf("%s:%d", h.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints))
} }
b, err := json.Marshal(madmin.HealStartSuccess{ b, err := json.Marshal(madmin.HealStartSuccess{

View File

@ -46,6 +46,7 @@ import (
"github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http" xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/pkg/console" "github.com/minio/pkg/console"
"github.com/minio/pkg/env" "github.com/minio/pkg/env"
@ -1526,14 +1527,14 @@ func (a adminAPIHandlers) DescribeBatchJob(w http.ResponseWriter, r *http.Reques
return return
} }
id := r.Form.Get("jobId") jobID := r.Form.Get("jobId")
if id == "" { if jobID == "" {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, errInvalidArgument), r.URL) writeErrorResponseJSON(ctx, w, toAPIError(ctx, errInvalidArgument), r.URL)
return return
} }
req := &BatchJobRequest{} req := &BatchJobRequest{}
if err := req.load(ctx, objectAPI, pathJoin(batchJobPrefix, id)); err != nil { if err := req.load(ctx, objectAPI, pathJoin(batchJobPrefix, jobID)); err != nil {
if !errors.Is(err, errNoSuchJob) { if !errors.Is(err, errNoSuchJob) {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} }
@ -1561,7 +1562,7 @@ func (a adminAPIHandlers) StartBatchJob(w http.ResponseWriter, r *http.Request)
return return
} }
buf, err := io.ReadAll(r.Body) buf, err := io.ReadAll(ioutil.HardLimitReader(r.Body, humanize.MiByte*4))
if err != nil { if err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return return
@ -1578,7 +1579,7 @@ func (a adminAPIHandlers) StartBatchJob(w http.ResponseWriter, r *http.Request)
return return
} }
job.ID = shortuuid.New() job.ID = fmt.Sprintf("%s:%d", shortuuid.New(), GetProxyEndpointLocalIndex(globalProxyEndpoints))
job.User = user job.User = user
job.Started = time.Now() job.Started = time.Now()
@ -1614,19 +1615,27 @@ func (a adminAPIHandlers) CancelBatchJob(w http.ResponseWriter, r *http.Request)
if objectAPI == nil { if objectAPI == nil {
return return
} }
jobID := r.Form.Get("id") jobID := r.Form.Get("id")
if jobID == "" { if jobID == "" {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, errInvalidArgument), r.URL) writeErrorResponseJSON(ctx, w, toAPIError(ctx, errInvalidArgument), r.URL)
return return
} }
if _, success := proxyRequestByToken(ctx, w, r, jobID); success {
return
}
if err := globalBatchJobPool.canceler(jobID, true); err != nil { if err := globalBatchJobPool.canceler(jobID, true); err != nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrInvalidRequest, err), r.URL) writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrInvalidRequest, err), r.URL)
return return
} }
j := BatchJobRequest{ j := BatchJobRequest{
ID: jobID, ID: jobID,
Location: pathJoin(batchJobPrefix, jobID), Location: pathJoin(batchJobPrefix, jobID),
} }
j.delete(ctx, objectAPI) j.delete(ctx, objectAPI)
writeSuccessNoContent(w) writeSuccessNoContent(w)
@ -1681,6 +1690,11 @@ func (j *BatchJobPool) resume() {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
continue continue
} }
_, nodeIdx := parseRequestToken(req.ID)
if nodeIdx > -1 && GetProxyEndpointLocalIndex(globalProxyEndpoints) != nodeIdx {
// This job doesn't belong on this node.
continue
}
if err := j.queueJob(req); err != nil { if err := j.queueJob(req); err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
continue continue

View File

@ -231,7 +231,7 @@ func parseRequestToken(token string) (subToken string, nodeIndex int) {
if token == "" { if token == "" {
return token, -1 return token, -1
} }
i := strings.Index(token, "@") i := strings.Index(token, ":")
if i < 0 { if i < 0 {
return token, -1 return token, -1
} }