mirror of
https://github.com/minio/minio.git
synced 2025-03-30 17:23:42 -04:00
fix: apply common notification code with content-type (#17843)
This commit is contained in:
parent
96a22bfcbb
commit
5f56f441bf
@ -277,23 +277,28 @@ type BatchJobRequest struct {
|
|||||||
ctx context.Context `msg:"-"`
|
ctx context.Context `msg:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify notifies notification endpoint if configured regarding job failure or success.
|
func notifyEndpoint(ctx context.Context, ri *batchJobInfo, endpoint, token string) error {
|
||||||
func (r BatchJobReplicateV1) Notify(ctx context.Context, body io.Reader) error {
|
if endpoint == "" {
|
||||||
if r.Flags.Notify.Endpoint == "" {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
buf, err := json.Marshal(ri)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.Flags.Notify.Endpoint, body)
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(buf))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.Flags.Notify.Token != "" {
|
if token != "" {
|
||||||
req.Header.Set("Authorization", r.Flags.Notify.Token)
|
req.Header.Set("Authorization", token)
|
||||||
}
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
clnt := http.Client{Transport: getRemoteInstanceTransport}
|
clnt := http.Client{Transport: getRemoteInstanceTransport}
|
||||||
resp, err := clnt.Do(req)
|
resp, err := clnt.Do(req)
|
||||||
@ -309,6 +314,11 @@ func (r BatchJobReplicateV1) Notify(ctx context.Context, body io.Reader) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Notify notifies notification endpoint if configured regarding job failure or success.
|
||||||
|
func (r BatchJobReplicateV1) Notify(ctx context.Context, ri *batchJobInfo) error {
|
||||||
|
return notifyEndpoint(ctx, ri, r.Flags.Notify.Endpoint, r.Flags.Notify.Token)
|
||||||
|
}
|
||||||
|
|
||||||
// ReplicateFromSource - this is not implemented yet where source is 'remote' and target is local.
|
// ReplicateFromSource - this is not implemented yet where source is 'remote' and target is local.
|
||||||
func (r *BatchJobReplicateV1) ReplicateFromSource(ctx context.Context, api ObjectLayer, core *miniogo.Core, srcObjInfo ObjectInfo, retry bool) error {
|
func (r *BatchJobReplicateV1) ReplicateFromSource(ctx context.Context, api ObjectLayer, core *miniogo.Core, srcObjInfo ObjectInfo, retry bool) error {
|
||||||
srcBucket := r.Source.Bucket
|
srcBucket := r.Source.Bucket
|
||||||
@ -643,8 +653,7 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
|
|||||||
// persist in-memory state to disk.
|
// persist in-memory state to disk.
|
||||||
logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job))
|
logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job))
|
||||||
|
|
||||||
buf, _ := json.Marshal(ri)
|
if err := r.Notify(ctx, ri); err != nil {
|
||||||
if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil {
|
|
||||||
logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err))
|
logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1191,8 +1200,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
|
|||||||
// persist in-memory state to disk.
|
// persist in-memory state to disk.
|
||||||
logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job))
|
logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job))
|
||||||
|
|
||||||
buf, _ := json.Marshal(ri)
|
if err := r.Notify(ctx, ri); err != nil {
|
||||||
if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil {
|
|
||||||
logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err))
|
logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,13 +18,9 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"runtime"
|
"runtime"
|
||||||
@ -214,35 +210,8 @@ type BatchJobKeyRotateV1 struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Notify notifies notification endpoint if configured regarding job failure or success.
|
// Notify notifies notification endpoint if configured regarding job failure or success.
|
||||||
func (r BatchJobKeyRotateV1) Notify(ctx context.Context, body io.Reader) error {
|
func (r BatchJobKeyRotateV1) Notify(ctx context.Context, ri *batchJobInfo) error {
|
||||||
if r.Flags.Notify.Endpoint == "" {
|
return notifyEndpoint(ctx, ri, r.Flags.Notify.Endpoint, r.Flags.Notify.Token)
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.Flags.Notify.Endpoint, body)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if r.Flags.Notify.Token != "" {
|
|
||||||
req.Header.Set("Authorization", r.Flags.Notify.Token)
|
|
||||||
}
|
|
||||||
|
|
||||||
clnt := http.Client{Transport: getRemoteInstanceTransport}
|
|
||||||
resp, err := clnt.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
xhttp.DrainBody(resp.Body)
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
return errors.New(resp.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// KeyRotate rotates encryption key of an object
|
// KeyRotate rotates encryption key of an object
|
||||||
@ -330,7 +299,7 @@ const (
|
|||||||
batchKeyRotateVersion = batchKeyRotateVersionV1
|
batchKeyRotateVersion = batchKeyRotateVersionV1
|
||||||
batchKeyRotateAPIVersion = "v1"
|
batchKeyRotateAPIVersion = "v1"
|
||||||
batchKeyRotateJobDefaultRetries = 3
|
batchKeyRotateJobDefaultRetries = 3
|
||||||
batchKeyRotateJobDefaultRetryDelay = 250 * time.Millisecond
|
batchKeyRotateJobDefaultRetryDelay = 25 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
// Start the batch key rottion job, resumes if there was a pending job via "job.ID"
|
// Start the batch key rottion job, resumes if there was a pending job via "job.ID"
|
||||||
@ -351,6 +320,7 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba
|
|||||||
if delay == 0 {
|
if delay == 0 {
|
||||||
delay = batchKeyRotateJobDefaultRetryDelay
|
delay = batchKeyRotateJobDefaultRetryDelay
|
||||||
}
|
}
|
||||||
|
|
||||||
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
|
||||||
skip := func(info FileInfo) (ok bool) {
|
skip := func(info FileInfo) (ok bool) {
|
||||||
@ -475,6 +445,9 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba
|
|||||||
if success {
|
if success {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
if delay > 0 {
|
||||||
|
time.Sleep(delay + time.Duration(rnd.Float64()*float64(delay)))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -486,20 +459,11 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba
|
|||||||
// persist in-memory state to disk.
|
// persist in-memory state to disk.
|
||||||
logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job))
|
logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job))
|
||||||
|
|
||||||
buf, _ := json.Marshal(ri)
|
if err := r.Notify(ctx, ri); err != nil {
|
||||||
if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil {
|
|
||||||
logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err))
|
logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
if ri.Failed {
|
|
||||||
ri.ObjectsFailed = 0
|
|
||||||
ri.Bucket = ""
|
|
||||||
ri.Object = ""
|
|
||||||
ri.Objects = 0
|
|
||||||
time.Sleep(delay + time.Duration(rnd.Float64()*float64(delay)))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user