diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 04632ef9c..aec50dd09 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -277,23 +277,28 @@ type BatchJobRequest struct { ctx context.Context `msg:"-"` } -// Notify notifies notification endpoint if configured regarding job failure or success. -func (r BatchJobReplicateV1) Notify(ctx context.Context, body io.Reader) error { - if r.Flags.Notify.Endpoint == "" { +func notifyEndpoint(ctx context.Context, ri *batchJobInfo, endpoint, token string) error { + if endpoint == "" { return nil } + buf, err := json.Marshal(ri) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.Flags.Notify.Endpoint, body) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(buf)) if err != nil { return err } - if r.Flags.Notify.Token != "" { - req.Header.Set("Authorization", r.Flags.Notify.Token) + if token != "" { + req.Header.Set("Authorization", token) } + req.Header.Set("Content-Type", "application/json") clnt := http.Client{Transport: getRemoteInstanceTransport} resp, err := clnt.Do(req) @@ -309,6 +314,11 @@ func (r BatchJobReplicateV1) Notify(ctx context.Context, body io.Reader) error { return nil } +// Notify notifies notification endpoint if configured regarding job failure or success. +func (r BatchJobReplicateV1) Notify(ctx context.Context, ri *batchJobInfo) error { + return notifyEndpoint(ctx, ri, r.Flags.Notify.Endpoint, r.Flags.Notify.Token) +} + // ReplicateFromSource - this is not implemented yet where source is 'remote' and target is local. func (r *BatchJobReplicateV1) ReplicateFromSource(ctx context.Context, api ObjectLayer, core *miniogo.Core, srcObjInfo ObjectInfo, retry bool) error { srcBucket := r.Source.Bucket @@ -643,8 +653,7 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay // persist in-memory state to disk. logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job)) - buf, _ := json.Marshal(ri) - if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil { + if err := r.Notify(ctx, ri); err != nil { logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err)) } @@ -1191,8 +1200,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba // persist in-memory state to disk. logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job)) - buf, _ := json.Marshal(ri) - if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil { + if err := r.Notify(ctx, ri); err != nil { logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err)) } diff --git a/cmd/batch-rotate.go b/cmd/batch-rotate.go index ad23d84a4..0954c81ba 100644 --- a/cmd/batch-rotate.go +++ b/cmd/batch-rotate.go @@ -18,13 +18,9 @@ package cmd import ( - "bytes" "context" "encoding/base64" - "encoding/json" - "errors" "fmt" - "io" "math/rand" "net/http" "runtime" @@ -214,35 +210,8 @@ type BatchJobKeyRotateV1 struct { } // Notify notifies notification endpoint if configured regarding job failure or success. -func (r BatchJobKeyRotateV1) Notify(ctx context.Context, body io.Reader) error { - if r.Flags.Notify.Endpoint == "" { - return nil - } - - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.Flags.Notify.Endpoint, body) - if err != nil { - return err - } - - if r.Flags.Notify.Token != "" { - req.Header.Set("Authorization", r.Flags.Notify.Token) - } - - clnt := http.Client{Transport: getRemoteInstanceTransport} - resp, err := clnt.Do(req) - if err != nil { - return err - } - - xhttp.DrainBody(resp.Body) - if resp.StatusCode != http.StatusOK { - return errors.New(resp.Status) - } - - return nil +func (r BatchJobKeyRotateV1) Notify(ctx context.Context, ri *batchJobInfo) error { + return notifyEndpoint(ctx, ri, r.Flags.Notify.Endpoint, r.Flags.Notify.Token) } // KeyRotate rotates encryption key of an object @@ -330,7 +299,7 @@ const ( batchKeyRotateVersion = batchKeyRotateVersionV1 batchKeyRotateAPIVersion = "v1" batchKeyRotateJobDefaultRetries = 3 - batchKeyRotateJobDefaultRetryDelay = 250 * time.Millisecond + batchKeyRotateJobDefaultRetryDelay = 25 * time.Millisecond ) // Start the batch key rottion job, resumes if there was a pending job via "job.ID" @@ -351,6 +320,7 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba if delay == 0 { delay = batchKeyRotateJobDefaultRetryDelay } + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) skip := func(info FileInfo) (ok bool) { @@ -475,6 +445,9 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba if success { break } + if delay > 0 { + time.Sleep(delay + time.Duration(rnd.Float64()*float64(delay))) + } } }() } @@ -486,20 +459,11 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba // persist in-memory state to disk. logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job)) - buf, _ := json.Marshal(ri) - if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil { + if err := r.Notify(ctx, ri); err != nil { logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err)) } cancel() - if ri.Failed { - ri.ObjectsFailed = 0 - ri.Bucket = "" - ri.Object = "" - ri.Objects = 0 - time.Sleep(delay + time.Duration(rnd.Float64()*float64(delay))) - } - return nil }