minio/cmd/batch-handlers.go

1813 lines
49 KiB
Go

// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"path"
"runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/dustin/go-humanize"
"github.com/lithammer/shortuuid/v4"
"github.com/minio/madmin-go/v2"
"github.com/minio/minio-go/v7"
miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/auth"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/workers"
"github.com/minio/pkg/console"
"github.com/minio/pkg/env"
iampolicy "github.com/minio/pkg/iam/policy"
"github.com/minio/pkg/wildcard"
"gopkg.in/yaml.v2"
)
// replicate:
// # source of the objects to be replicated
// source:
// type: "minio"
// bucket: "testbucket"
// prefix: "spark/"
//
// # optional flags based filtering criteria
// # for source objects
// flags:
// filter:
// newerThan: "7d"
// olderThan: "7d"
// createdAfter: "date"
// createdBefore: "date"
// tags:
// - key: "name"
// value: "value*"
// metadata:
// - key: "content-type"
// value: "image/*"
// notify:
// endpoint: "https://splunk-hec.dev.com"
// token: "Splunk ..." # e.g. "Bearer token"
//
// # target where the objects must be replicated
// target:
// type: "minio"
// bucket: "testbucket1"
// endpoint: "https://play.min.io"
// credentials:
// accessKey: "minioadmin"
// secretKey: "minioadmin"
// sessionToken: ""
// BatchJobReplicateKV is a datatype that holds key and values for filtering of objects
// used by metadata filter as well as tags based filtering.
type BatchJobReplicateKV struct {
Key string `yaml:"key" json:"key"`
Value string `yaml:"value" json:"value"`
}
// Validate returns an error if key is empty
func (kv BatchJobReplicateKV) Validate() error {
if kv.Key == "" {
return errInvalidArgument
}
return nil
}
// Empty indicates if kv is not set
func (kv BatchJobReplicateKV) Empty() bool {
return kv.Key == "" && kv.Value == ""
}
// Match matches input kv with kv, value will be wildcard matched depending on the user input
func (kv BatchJobReplicateKV) Match(ikv BatchJobReplicateKV) bool {
if kv.Empty() {
return true
}
if strings.EqualFold(kv.Key, ikv.Key) {
return wildcard.Match(kv.Value, ikv.Value)
}
return false
}
// BatchReplicateRetry datatype represents total retry attempts and delay between each retries.
type BatchReplicateRetry struct {
Attempts int `yaml:"attempts" json:"attempts"` // number of retry attempts
Delay time.Duration `yaml:"delay" json:"delay"` // delay between each retries
}
// Validate validates input replicate retries.
func (r BatchReplicateRetry) Validate() error {
if r.Attempts < 0 {
return errInvalidArgument
}
if r.Delay < 0 {
return errInvalidArgument
}
return nil
}
// BatchReplicateFilter holds all the filters currently supported for batch replication
type BatchReplicateFilter struct {
NewerThan time.Duration `yaml:"newerThan,omitempty" json:"newerThan"`
OlderThan time.Duration `yaml:"olderThan,omitempty" json:"olderThan"`
CreatedAfter time.Time `yaml:"createdAfter,omitempty" json:"createdAfter"`
CreatedBefore time.Time `yaml:"createdBefore,omitempty" json:"createdBefore"`
Tags []BatchJobReplicateKV `yaml:"tags,omitempty" json:"tags"`
Metadata []BatchJobReplicateKV `yaml:"metadata,omitempty" json:"metadata"`
}
// BatchReplicateNotification success or failure notification endpoint for each job attempts
type BatchReplicateNotification struct {
Endpoint string `yaml:"endpoint" json:"endpoint"`
Token string `yaml:"token" json:"token"`
}
// BatchJobReplicateFlags various configurations for replication job definition currently includes
// - filter
// - notify
// - retry
type BatchJobReplicateFlags struct {
Filter BatchReplicateFilter `yaml:"filter" json:"filter"`
Notify BatchReplicateNotification `yaml:"notify" json:"notify"`
Retry BatchReplicateRetry `yaml:"retry" json:"retry"`
}
// BatchJobReplicateResourceType defines the type of batch jobs
type BatchJobReplicateResourceType string
// Validate validates if the replicate resource type is recognized and supported
func (t BatchJobReplicateResourceType) Validate() error {
switch t {
case BatchJobReplicateResourceMinIO:
default:
return errInvalidArgument
}
return nil
}
// Different types of batch jobs..
const (
BatchJobReplicateResourceMinIO BatchJobReplicateResourceType = "minio"
// add future targets
)
// BatchJobReplicateCredentials access credentials for batch replication it may
// be either for target or source.
type BatchJobReplicateCredentials struct {
AccessKey string `xml:"AccessKeyId" json:"accessKey,omitempty" yaml:"accessKey"`
SecretKey string `xml:"SecretAccessKey" json:"secretKey,omitempty" yaml:"secretKey"`
SessionToken string `xml:"SessionToken" json:"sessionToken,omitempty" yaml:"sessionToken"`
}
// Empty indicates if credentials are not set
func (c BatchJobReplicateCredentials) Empty() bool {
return c.AccessKey == "" && c.SecretKey == "" && c.SessionToken == ""
}
// Validate validates if credentials are valid
func (c BatchJobReplicateCredentials) Validate() error {
if !auth.IsAccessKeyValid(c.AccessKey) || !auth.IsSecretKeyValid(c.SecretKey) {
return errInvalidArgument
}
return nil
}
// BatchJobReplicateTarget describes target element of the replication job that receives
// the filtered data from source
type BatchJobReplicateTarget struct {
Type BatchJobReplicateResourceType `yaml:"type" json:"type"`
Bucket string `yaml:"bucket" json:"bucket"`
Prefix string `yaml:"prefix" json:"prefix"`
Endpoint string `yaml:"endpoint" json:"endpoint"`
Creds BatchJobReplicateCredentials `yaml:"credentials" json:"credentials"`
}
// BatchJobReplicateSource describes source element of the replication job that is
// the source of the data for the target
type BatchJobReplicateSource struct {
Type BatchJobReplicateResourceType `yaml:"type" json:"type"`
Bucket string `yaml:"bucket" json:"bucket"`
Prefix string `yaml:"prefix" json:"prefix"`
Endpoint string `yaml:"endpoint" json:"endpoint"`
Creds BatchJobReplicateCredentials `yaml:"credentials" json:"credentials"`
}
// BatchJobReplicateV1 v1 of batch job replication
type BatchJobReplicateV1 struct {
APIVersion string `yaml:"apiVersion" json:"apiVersion"`
Flags BatchJobReplicateFlags `yaml:"flags" json:"flags"`
Target BatchJobReplicateTarget `yaml:"target" json:"target"`
Source BatchJobReplicateSource `yaml:"source" json:"source"`
clnt *miniogo.Core `msg:"-"`
}
// RemoteToLocal returns true if source is remote and target is local
func (r BatchJobReplicateV1) RemoteToLocal() bool {
return !r.Source.Creds.Empty()
}
// BatchJobRequest this is an internal data structure not for external consumption.
type BatchJobRequest struct {
ID string `yaml:"-" json:"name"`
User string `yaml:"-" json:"user"`
Started time.Time `yaml:"-" json:"started"`
Location string `yaml:"-" json:"location"`
Replicate *BatchJobReplicateV1 `yaml:"replicate" json:"replicate"`
KeyRotate *BatchJobKeyRotateV1 `yaml:"keyrotate" json:"keyrotate"`
ctx context.Context `msg:"-"`
}
// Notify notifies notification endpoint if configured regarding job failure or success.
func (r BatchJobReplicateV1) Notify(ctx context.Context, body io.Reader) error {
if r.Flags.Notify.Endpoint == "" {
return nil
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.Flags.Notify.Endpoint, body)
if err != nil {
return err
}
if r.Flags.Notify.Token != "" {
req.Header.Set("Authorization", r.Flags.Notify.Token)
}
clnt := http.Client{Transport: getRemoteInstanceTransport}
resp, err := clnt.Do(req)
if err != nil {
return err
}
xhttp.DrainBody(resp.Body)
if resp.StatusCode != http.StatusOK {
return errors.New(resp.Status)
}
return nil
}
// ReplicateFromSource - this is not implemented yet where source is 'remote' and target is local.
func (r *BatchJobReplicateV1) ReplicateFromSource(ctx context.Context, api ObjectLayer, core *minio.Core, srcObjInfo ObjectInfo, retry bool) error {
srcBucket := r.Source.Bucket
tgtBucket := r.Target.Bucket
srcObject := srcObjInfo.Name
tgtObject := srcObjInfo.Name
if r.Target.Prefix != "" {
tgtObject = path.Join(r.Target.Prefix, srcObjInfo.Name)
}
versioned := globalBucketVersioningSys.PrefixEnabled(tgtBucket, tgtObject)
versionSuspended := globalBucketVersioningSys.PrefixSuspended(tgtBucket, tgtObject)
if srcObjInfo.DeleteMarker {
_, err := api.DeleteObject(ctx, tgtBucket, tgtObject, ObjectOptions{
VersionID: srcObjInfo.VersionID,
VersionSuspended: versionSuspended,
Versioned: versioned,
MTime: srcObjInfo.ModTime,
DeleteMarker: srcObjInfo.DeleteMarker,
ReplicationRequest: true,
})
return err
}
opts := ObjectOptions{
VersionID: srcObjInfo.VersionID,
Versioned: versioned,
VersionSuspended: versionSuspended,
MTime: srcObjInfo.ModTime,
PreserveETag: srcObjInfo.ETag,
UserDefined: srcObjInfo.UserDefined,
}
if crypto.S3.IsEncrypted(srcObjInfo.UserDefined) {
opts.ServerSideEncryption = encrypt.NewSSE()
}
slc := strings.Split(srcObjInfo.ETag, "-")
if len(slc) == 2 {
partsCount, err := strconv.Atoi(slc[1])
if err != nil {
return err
}
return r.copyWithMultipartfromSource(ctx, api, core, srcObjInfo, opts, partsCount)
}
gopts := minio.GetObjectOptions{
VersionID: srcObjInfo.VersionID,
}
if err := gopts.SetMatchETag(srcObjInfo.ETag); err != nil {
return err
}
rd, objInfo, _, err := core.GetObject(ctx, srcBucket, srcObject, gopts)
if err != nil {
return err
}
defer rd.Close()
hr, err := hash.NewReader(rd, objInfo.Size, "", "", objInfo.Size)
if err != nil {
return err
}
pReader := NewPutObjReader(hr)
_, err = api.PutObject(ctx, tgtBucket, tgtObject, pReader, opts)
return err
}
func (r *BatchJobReplicateV1) copyWithMultipartfromSource(ctx context.Context, api ObjectLayer, c *minio.Core, srcObjInfo ObjectInfo, opts ObjectOptions, partsCount int) (err error) {
srcBucket := r.Source.Bucket
tgtBucket := r.Target.Bucket
srcObject := srcObjInfo.Name
tgtObject := srcObjInfo.Name
if r.Target.Prefix != "" {
tgtObject = path.Join(r.Target.Prefix, srcObjInfo.Name)
}
var uploadedParts []CompletePart
res, err := api.NewMultipartUpload(context.Background(), tgtBucket, tgtObject, opts)
if err != nil {
return err
}
defer func() {
if err != nil {
// block and abort remote upload upon failure.
attempts := 1
for attempts <= 3 {
aerr := api.AbortMultipartUpload(ctx, tgtBucket, tgtObject, res.UploadID, ObjectOptions{})
if aerr == nil {
return
}
logger.LogIf(ctx,
fmt.Errorf("trying %s: Unable to cleanup failed multipart replication %s on remote %s/%s: %w - this may consume space on remote cluster",
humanize.Ordinal(attempts), res.UploadID, tgtBucket, tgtObject, aerr))
attempts++
time.Sleep(time.Second)
}
}
}()
var (
hr *hash.Reader
pInfo PartInfo
)
for i := 0; i < partsCount; i++ {
gopts := minio.GetObjectOptions{
VersionID: srcObjInfo.VersionID,
PartNumber: i + 1,
}
if err := gopts.SetMatchETag(srcObjInfo.ETag); err != nil {
return err
}
rd, objInfo, _, err := c.GetObject(ctx, srcBucket, srcObject, gopts)
if err != nil {
return err
}
defer rd.Close()
hr, err = hash.NewReader(rd, objInfo.Size, "", "", objInfo.Size)
if err != nil {
return err
}
pReader := NewPutObjReader(hr)
opts.PreserveETag = ""
pInfo, err = api.PutObjectPart(ctx, tgtBucket, tgtObject, res.UploadID, i+1, pReader, opts)
if err != nil {
return err
}
if pInfo.Size != objInfo.Size {
return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, objInfo.Size)
}
uploadedParts = append(uploadedParts, CompletePart{
PartNumber: pInfo.PartNumber,
ETag: pInfo.ETag,
})
}
_, err = api.CompleteMultipartUpload(ctx, tgtBucket, tgtObject, res.UploadID, uploadedParts, opts)
return err
}
// StartFromSource starts the batch replication job from remote source, resumes if there was a pending job via "job.ID"
func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLayer, job BatchJobRequest) error {
ri := &batchJobInfo{
JobID: job.ID,
JobType: string(job.Type()),
StartTime: job.Started,
}
if err := ri.load(ctx, api, job); err != nil {
return err
}
globalBatchJobsMetrics.save(job.ID, ri)
delay := job.Replicate.Flags.Retry.Delay
if delay == 0 {
delay = batchReplJobDefaultRetryDelay
}
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
skip := func(oi ObjectInfo) (ok bool) {
if r.Flags.Filter.OlderThan > 0 && time.Since(oi.ModTime) < r.Flags.Filter.OlderThan {
// skip all objects that are newer than specified older duration
return true
}
if r.Flags.Filter.NewerThan > 0 && time.Since(oi.ModTime) >= r.Flags.Filter.NewerThan {
// skip all objects that are older than specified newer duration
return true
}
if !r.Flags.Filter.CreatedAfter.IsZero() && r.Flags.Filter.CreatedAfter.Before(oi.ModTime) {
// skip all objects that are created before the specified time.
return true
}
if !r.Flags.Filter.CreatedBefore.IsZero() && r.Flags.Filter.CreatedBefore.After(oi.ModTime) {
// skip all objects that are created after the specified time.
return true
}
if len(r.Flags.Filter.Tags) > 0 {
// Only parse object tags if tags filter is specified.
tagMap := map[string]string{}
tagStr := oi.UserTags
if len(tagStr) != 0 {
t, err := tags.ParseObjectTags(tagStr)
if err != nil {
return false
}
tagMap = t.ToMap()
}
for _, kv := range r.Flags.Filter.Tags {
for t, v := range tagMap {
if kv.Match(BatchJobReplicateKV{Key: t, Value: v}) {
return true
}
}
}
// None of the provided tags filter match skip the object
return false
}
if len(r.Flags.Filter.Metadata) > 0 {
for _, kv := range r.Flags.Filter.Metadata {
for k, v := range oi.UserDefined {
if !strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") && !isStandardHeader(k) {
continue
}
// We only need to match x-amz-meta or standardHeaders
if kv.Match(BatchJobReplicateKV{Key: k, Value: v}) {
return true
}
}
}
// None of the provided metadata filters match skip the object.
return false
}
return false
}
u, err := url.Parse(r.Source.Endpoint)
if err != nil {
return err
}
cred := r.Source.Creds
c, err := miniogo.New(u.Host, &miniogo.Options{
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: u.Scheme == "https",
Transport: getRemoteInstanceTransport,
})
if err != nil {
return err
}
c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID)
core := &minio.Core{Client: c}
workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2)))
if err != nil {
return err
}
wk, err := workers.New(workerSize)
if err != nil {
// invalid worker size.
return err
}
retryAttempts := ri.RetryAttempts
retry := false
for attempts := 1; attempts <= retryAttempts; attempts++ {
attempts := attempts
ctx, cancel := context.WithCancel(ctx)
objInfoCh := c.ListObjects(ctx, r.Source.Bucket, minio.ListObjectsOptions{
Prefix: r.Source.Prefix,
WithVersions: true,
Recursive: true,
WithMetadata: true,
})
for obj := range objInfoCh {
oi := toObjectInfo(r.Source.Bucket, obj.Key, obj)
if skip(oi) {
continue
}
wk.Take()
go func() {
defer wk.Give()
stopFn := globalBatchJobsMetrics.trace(batchReplicationMetricObject, job.ID, attempts, oi)
success := true
if err := r.ReplicateFromSource(ctx, api, core, oi, retry); err != nil {
// object must be deleted concurrently, allow these failures but do not count them
if isErrVersionNotFound(err) || isErrObjectNotFound(err) {
return
}
stopFn(err)
logger.LogIf(ctx, err)
success = false
} else {
stopFn(nil)
}
ri.trackCurrentBucketObject(r.Target.Bucket, oi, success)
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs.
logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
}()
}
wk.Wait()
ri.RetryAttempts = attempts
ri.Complete = ri.ObjectsFailed == 0
ri.Failed = ri.ObjectsFailed > 0
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk.
logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job))
buf, _ := json.Marshal(ri)
if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil {
logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err))
}
cancel()
if ri.Failed {
ri.ObjectsFailed = 0
ri.Bucket = ""
ri.Object = ""
ri.Objects = 0
ri.BytesFailed = 0
ri.BytesTransferred = 0
retry = true // indicate we are retrying..
time.Sleep(delay + time.Duration(rnd.Float64()*float64(delay)))
continue
}
break
}
return nil
}
// toObjectInfo converts minio.ObjectInfo to ObjectInfo
func toObjectInfo(bucket, object string, objInfo minio.ObjectInfo) ObjectInfo {
tags, _ := tags.MapToObjectTags(objInfo.UserTags)
oi := ObjectInfo{
Bucket: bucket,
Name: object,
ModTime: objInfo.LastModified,
Size: objInfo.Size,
ETag: objInfo.ETag,
VersionID: objInfo.VersionID,
IsLatest: objInfo.IsLatest,
DeleteMarker: objInfo.IsDeleteMarker,
ContentType: objInfo.ContentType,
Expires: objInfo.Expires,
StorageClass: objInfo.StorageClass,
ReplicationStatusInternal: objInfo.ReplicationStatus,
UserTags: tags.String(),
}
oi.UserDefined = make(map[string]string, len(objInfo.Metadata))
for k, v := range objInfo.Metadata {
oi.UserDefined[k] = v[0]
}
ce, ok := oi.UserDefined[xhttp.ContentEncoding]
if !ok {
ce, ok = oi.UserDefined[strings.ToLower(xhttp.ContentEncoding)]
}
if ok {
oi.ContentEncoding = ce
}
return oi
}
// ReplicateToTarget read from source and replicate to configured target
func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectLayer, c *miniogo.Core, srcObjInfo ObjectInfo, retry bool) error {
srcBucket := r.Source.Bucket
tgtBucket := r.Target.Bucket
tgtPrefix := r.Target.Prefix
srcObject := srcObjInfo.Name
if srcObjInfo.DeleteMarker || !srcObjInfo.VersionPurgeStatus.Empty() {
if retry {
if _, err := c.StatObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), miniogo.StatObjectOptions{
VersionID: srcObjInfo.VersionID,
Internal: miniogo.AdvancedGetOptions{
ReplicationProxyRequest: "false",
},
}); isErrMethodNotAllowed(ErrorRespToObjectError(err, tgtBucket, pathJoin(tgtPrefix, srcObject))) {
return nil
}
}
versionID := srcObjInfo.VersionID
dmVersionID := ""
if srcObjInfo.VersionPurgeStatus.Empty() {
dmVersionID = srcObjInfo.VersionID
}
return c.RemoveObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), miniogo.RemoveObjectOptions{
VersionID: versionID,
Internal: miniogo.AdvancedRemoveOptions{
ReplicationDeleteMarker: dmVersionID != "",
ReplicationMTime: srcObjInfo.ModTime,
ReplicationStatus: miniogo.ReplicationStatusReplica,
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
},
})
}
if retry { // when we are retrying avoid copying if necessary.
gopts := miniogo.GetObjectOptions{}
if err := gopts.SetMatchETag(srcObjInfo.ETag); err != nil {
return err
}
if _, err := c.StatObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), gopts); err == nil {
return nil
}
}
versioned := globalBucketVersioningSys.PrefixEnabled(srcBucket, srcObject)
versionSuspended := globalBucketVersioningSys.PrefixSuspended(srcBucket, srcObject)
opts := ObjectOptions{
VersionID: srcObjInfo.VersionID,
Versioned: versioned,
VersionSuspended: versionSuspended,
}
rd, err := api.GetObjectNInfo(ctx, srcBucket, srcObject, nil, http.Header{}, opts)
if err != nil {
return err
}
defer rd.Close()
objInfo := rd.ObjInfo
size, err := objInfo.GetActualSize()
if err != nil {
return err
}
putOpts, err := batchReplicationOpts(ctx, "", objInfo)
if err != nil {
return err
}
if objInfo.isMultipart() {
if err := replicateObjectWithMultipart(ctx, c, tgtBucket, pathJoin(tgtPrefix, objInfo.Name), rd, objInfo, putOpts); err != nil {
return err
}
} else {
if _, err = c.PutObject(ctx, tgtBucket, pathJoin(tgtPrefix, objInfo.Name), rd, size, "", "", putOpts); err != nil {
return err
}
}
return nil
}
//go:generate msgp -file $GOFILE -unexported
// batchJobInfo current batch replication information
type batchJobInfo struct {
mu sync.RWMutex `json:"-" msg:"-"`
Version int `json:"-" msg:"v"`
JobID string `json:"jobID" msg:"jid"`
JobType string `json:"jobType" msg:"jt"`
StartTime time.Time `json:"startTime" msg:"st"`
LastUpdate time.Time `json:"lastUpdate" msg:"lu"`
RetryAttempts int `json:"retryAttempts" msg:"ra"`
Complete bool `json:"complete" msg:"cmp"`
Failed bool `json:"failed" msg:"fld"`
// Last bucket/object batch replicated
Bucket string `json:"-" msg:"lbkt"`
Object string `json:"-" msg:"lobj"`
// Verbose information
Objects int64 `json:"objects" msg:"ob"`
DeleteMarkers int64 `json:"deleteMarkers" msg:"dm"`
ObjectsFailed int64 `json:"objectsFailed" msg:"obf"`
DeleteMarkersFailed int64 `json:"deleteMarkersFailed" msg:"dmf"`
BytesTransferred int64 `json:"bytesTransferred" msg:"bt"`
BytesFailed int64 `json:"bytesFailed" msg:"bf"`
}
const (
batchReplName = "batch-replicate.bin"
batchReplFormat = 1
batchReplVersionV1 = 1
batchReplVersion = batchReplVersionV1
batchJobName = "job.bin"
batchJobPrefix = "batch-jobs"
batchReplJobAPIVersion = "v1"
batchReplJobDefaultRetries = 3
batchReplJobDefaultRetryDelay = 250 * time.Millisecond
)
func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobRequest) error {
var fileName string
var format, version uint16
switch {
case job.Replicate != nil:
fileName = batchReplName
version = batchReplVersionV1
format = batchReplFormat
case job.KeyRotate != nil:
fileName = batchKeyRotationName
version = batchKeyRotateVersionV1
format = batchKeyRotationFormat
}
data, err := readConfig(ctx, api, pathJoin(job.Location, fileName))
if err != nil {
if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) {
ri.Version = int(version)
switch {
case job.Replicate != nil:
ri.RetryAttempts = batchReplJobDefaultRetries
if job.Replicate.Flags.Retry.Attempts > 0 {
ri.RetryAttempts = job.Replicate.Flags.Retry.Attempts
}
case job.KeyRotate != nil:
ri.RetryAttempts = batchKeyRotateJobDefaultRetries
if job.KeyRotate.Flags.Retry.Attempts > 0 {
ri.RetryAttempts = job.KeyRotate.Flags.Retry.Attempts
}
}
return nil
}
return err
}
if len(data) == 0 {
// Seems to be empty create a new batchRepl object.
return nil
}
if len(data) <= 4 {
return fmt.Errorf("%s: no data", ri.JobType)
}
// Read header
switch binary.LittleEndian.Uint16(data[0:2]) {
case format:
default:
return fmt.Errorf("%s: unknown format: %d", ri.JobType, binary.LittleEndian.Uint16(data[0:2]))
}
switch binary.LittleEndian.Uint16(data[2:4]) {
case version:
default:
return fmt.Errorf("%s: unknown version: %d", ri.JobType, binary.LittleEndian.Uint16(data[2:4]))
}
ri.mu.Lock()
defer ri.mu.Unlock()
// OK, parse data.
if _, err = ri.UnmarshalMsg(data[4:]); err != nil {
return err
}
switch ri.Version {
case batchReplVersionV1:
default:
return fmt.Errorf("unexpected batch %s meta version: %d", ri.JobType, ri.Version)
}
return nil
}
func (ri *batchJobInfo) clone() *batchJobInfo {
ri.mu.RLock()
defer ri.mu.RUnlock()
return &batchJobInfo{
Version: ri.Version,
JobID: ri.JobID,
JobType: ri.JobType,
RetryAttempts: ri.RetryAttempts,
Complete: ri.Complete,
Failed: ri.Failed,
StartTime: ri.StartTime,
LastUpdate: ri.LastUpdate,
Bucket: ri.Bucket,
Object: ri.Object,
Objects: ri.Objects,
ObjectsFailed: ri.ObjectsFailed,
BytesTransferred: ri.BytesTransferred,
BytesFailed: ri.BytesFailed,
}
}
func (ri *batchJobInfo) countItem(size int64, dmarker, success bool) {
if ri == nil {
return
}
if success {
if dmarker {
ri.DeleteMarkers++
} else {
ri.Objects++
ri.BytesTransferred += size
}
} else {
if dmarker {
ri.DeleteMarkersFailed++
} else {
ri.ObjectsFailed++
ri.BytesFailed += size
}
}
}
func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, duration time.Duration, job BatchJobRequest) error {
if ri == nil {
return errInvalidArgument
}
now := UTCNow()
ri.mu.Lock()
var (
format, version uint16
jobTyp, fileName string
)
if now.Sub(ri.LastUpdate) >= duration {
switch job.Type() {
case madmin.BatchJobReplicate:
format = batchReplFormat
version = batchReplVersion
jobTyp = string(job.Type())
fileName = batchReplName
ri.Version = batchReplVersionV1
case madmin.BatchJobKeyRotate:
format = batchKeyRotationFormat
version = batchKeyRotateVersion
jobTyp = string(job.Type())
fileName = batchKeyRotationName
ri.Version = batchKeyRotateVersionV1
default:
return errInvalidArgument
}
if serverDebugLog {
console.Debugf("%s: persisting info on drive: threshold:%s, %s:%#v\n", jobTyp, now.Sub(ri.LastUpdate), jobTyp, ri)
}
ri.LastUpdate = now
data := make([]byte, 4, ri.Msgsize()+4)
// Initialize the header.
binary.LittleEndian.PutUint16(data[0:2], format)
binary.LittleEndian.PutUint16(data[2:4], version)
buf, err := ri.MarshalMsg(data)
ri.mu.Unlock()
if err != nil {
return err
}
return saveConfig(ctx, api, pathJoin(job.Location, fileName), buf)
}
ri.mu.Unlock()
return nil
}
func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, failed bool) {
if ri == nil {
return
}
ri.mu.Lock()
defer ri.mu.Unlock()
ri.Bucket = bucket
ri.Object = info.Name
ri.countItem(info.Size, info.DeleteMarker, failed)
}
// Start start the batch replication job, resumes if there was a pending job via "job.ID"
func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job BatchJobRequest) error {
ri := &batchJobInfo{
JobID: job.ID,
JobType: string(job.Type()),
StartTime: job.Started,
}
if err := ri.load(ctx, api, job); err != nil {
return err
}
globalBatchJobsMetrics.save(job.ID, ri)
lastObject := ri.Object
delay := job.Replicate.Flags.Retry.Delay
if delay == 0 {
delay = batchReplJobDefaultRetryDelay
}
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
skip := func(info FileInfo) (ok bool) {
if r.Flags.Filter.OlderThan > 0 && time.Since(info.ModTime) < r.Flags.Filter.OlderThan {
// skip all objects that are newer than specified older duration
return false
}
if r.Flags.Filter.NewerThan > 0 && time.Since(info.ModTime) >= r.Flags.Filter.NewerThan {
// skip all objects that are older than specified newer duration
return false
}
if !r.Flags.Filter.CreatedAfter.IsZero() && r.Flags.Filter.CreatedAfter.Before(info.ModTime) {
// skip all objects that are created before the specified time.
return false
}
if !r.Flags.Filter.CreatedBefore.IsZero() && r.Flags.Filter.CreatedBefore.After(info.ModTime) {
// skip all objects that are created after the specified time.
return false
}
if len(r.Flags.Filter.Tags) > 0 {
// Only parse object tags if tags filter is specified.
tagMap := map[string]string{}
tagStr := info.Metadata[xhttp.AmzObjectTagging]
if len(tagStr) != 0 {
t, err := tags.ParseObjectTags(tagStr)
if err != nil {
return false
}
tagMap = t.ToMap()
}
for _, kv := range r.Flags.Filter.Tags {
for t, v := range tagMap {
if kv.Match(BatchJobReplicateKV{Key: t, Value: v}) {
return true
}
}
}
// None of the provided tags filter match skip the object
return false
}
if len(r.Flags.Filter.Metadata) > 0 {
for _, kv := range r.Flags.Filter.Metadata {
for k, v := range info.Metadata {
if !strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") && !isStandardHeader(k) {
continue
}
// We only need to match x-amz-meta or standardHeaders
if kv.Match(BatchJobReplicateKV{Key: k, Value: v}) {
return true
}
}
}
// None of the provided metadata filters match skip the object.
return false
}
return true
}
u, err := url.Parse(r.Target.Endpoint)
if err != nil {
return err
}
cred := r.Target.Creds
c, err := miniogo.NewCore(u.Host, &miniogo.Options{
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: u.Scheme == "https",
Transport: getRemoteInstanceTransport,
})
if err != nil {
return err
}
c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID)
workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2)))
if err != nil {
return err
}
wk, err := workers.New(workerSize)
if err != nil {
// invalid worker size.
return err
}
retryAttempts := ri.RetryAttempts
retry := false
for attempts := 1; attempts <= retryAttempts; attempts++ {
attempts := attempts
ctx, cancel := context.WithCancel(ctx)
results := make(chan ObjectInfo, 100)
if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, results, ObjectOptions{
WalkMarker: lastObject,
WalkFilter: skip,
}); err != nil {
cancel()
// Do not need to retry if we can't list objects on source.
return err
}
for result := range results {
result := result
wk.Take()
go func() {
defer wk.Give()
stopFn := globalBatchJobsMetrics.trace(batchReplicationMetricObject, job.ID, attempts, result)
success := true
if err := r.ReplicateToTarget(ctx, api, c, result, retry); err != nil {
if miniogo.ToErrorResponse(err).Code == "PreconditionFailed" {
// pre-condition failed means we already have the object copied over.
return
}
// object must be deleted concurrently, allow these failures but do not count them
if isErrVersionNotFound(err) || isErrObjectNotFound(err) {
return
}
stopFn(err)
logger.LogIf(ctx, err)
success = false
} else {
stopFn(nil)
}
ri.trackCurrentBucketObject(r.Source.Bucket, result, success)
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs.
logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
}()
}
wk.Wait()
ri.RetryAttempts = attempts
ri.Complete = ri.ObjectsFailed == 0
ri.Failed = ri.ObjectsFailed > 0
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk.
logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job))
buf, _ := json.Marshal(ri)
if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil {
logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err))
}
cancel()
if ri.Failed {
ri.ObjectsFailed = 0
ri.Bucket = ""
ri.Object = ""
ri.Objects = 0
ri.BytesFailed = 0
ri.BytesTransferred = 0
retry = true // indicate we are retrying..
time.Sleep(delay + time.Duration(rnd.Float64()*float64(delay)))
continue
}
break
}
return nil
}
//msgp:ignore batchReplicationJobError
type batchReplicationJobError struct {
Code string
Description string
HTTPStatusCode int
}
func (e batchReplicationJobError) Error() string {
return e.Description
}
// Validate validates the job definition input
func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest, o ObjectLayer) error {
if r == nil {
return nil
}
if r.APIVersion != batchReplJobAPIVersion {
return errInvalidArgument
}
if r.Source.Bucket == "" {
return errInvalidArgument
}
var isRemoteToLocal bool
localBkt := r.Source.Bucket
if r.Source.Endpoint != "" {
localBkt = r.Target.Bucket
isRemoteToLocal = true
}
info, err := o.GetBucketInfo(ctx, localBkt, BucketOptions{})
if err != nil {
if isErrBucketNotFound(err) {
return batchReplicationJobError{
Code: "NoSuchSourceBucket",
Description: fmt.Sprintf("The specified bucket %s does not exist", localBkt),
HTTPStatusCode: http.StatusNotFound,
}
}
return err
}
if err := r.Source.Type.Validate(); err != nil {
return err
}
if r.Source.Creds.Empty() && r.Target.Creds.Empty() {
return errInvalidArgument
}
if !r.Source.Creds.Empty() {
if err := r.Source.Creds.Validate(); err != nil {
return err
}
}
if r.Target.Endpoint == "" && !r.Target.Creds.Empty() {
return errInvalidArgument
}
if r.Source.Endpoint == "" && !r.Source.Creds.Empty() {
return errInvalidArgument
}
if r.Target.Bucket == "" {
return errInvalidArgument
}
if !r.Target.Creds.Empty() {
if err := r.Target.Creds.Validate(); err != nil {
return err
}
}
if r.Source.Creds.Empty() && r.Target.Creds.Empty() {
return errInvalidArgument
}
if err := r.Target.Type.Validate(); err != nil {
return err
}
for _, tag := range r.Flags.Filter.Tags {
if err := tag.Validate(); err != nil {
return err
}
}
for _, meta := range r.Flags.Filter.Metadata {
if err := meta.Validate(); err != nil {
return err
}
}
if err := r.Flags.Retry.Validate(); err != nil {
return err
}
remoteEp := r.Target.Endpoint
remoteBkt := r.Target.Bucket
cred := r.Target.Creds
if r.Source.Endpoint != "" {
remoteEp = r.Source.Endpoint
cred = r.Source.Creds
remoteBkt = r.Source.Bucket
}
u, err := url.Parse(remoteEp)
if err != nil {
return err
}
c, err := miniogo.NewCore(u.Host, &miniogo.Options{
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: u.Scheme == "https",
Transport: getRemoteInstanceTransport,
})
if err != nil {
return err
}
c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID)
vcfg, err := c.GetBucketVersioning(ctx, remoteBkt)
if err != nil {
if miniogo.ToErrorResponse(err).Code == "NoSuchBucket" {
return batchReplicationJobError{
Code: "NoSuchTargetBucket",
Description: "The specified target bucket does not exist",
HTTPStatusCode: http.StatusNotFound,
}
}
return err
}
// If source has versioning enabled, target must have versioning enabled
if (info.Versioning && !vcfg.Enabled() && !isRemoteToLocal) || (!info.Versioning && vcfg.Enabled() && isRemoteToLocal) {
return batchReplicationJobError{
Code: "InvalidBucketState",
Description: fmt.Sprintf("The source '%s' has versioning enabled, target '%s' must have versioning enabled",
r.Source.Bucket, r.Target.Bucket),
HTTPStatusCode: http.StatusBadRequest,
}
}
r.clnt = c
return nil
}
// Type returns type of batch job, currently only supports 'replicate'
func (j BatchJobRequest) Type() madmin.BatchJobType {
switch {
case j.Replicate != nil:
return madmin.BatchJobReplicate
case j.KeyRotate != nil:
return madmin.BatchJobKeyRotate
}
return madmin.BatchJobType("unknown")
}
// Validate validates the current job, used by 'save()' before
// persisting the job request
func (j BatchJobRequest) Validate(ctx context.Context, o ObjectLayer) error {
switch {
case j.Replicate != nil:
return j.Replicate.Validate(ctx, j, o)
case j.KeyRotate != nil:
return j.KeyRotate.Validate(ctx, j, o)
}
return errInvalidArgument
}
func (j BatchJobRequest) delete(ctx context.Context, api ObjectLayer) {
switch {
case j.Replicate != nil:
deleteConfig(ctx, api, pathJoin(j.Location, batchReplName))
case j.KeyRotate != nil:
deleteConfig(ctx, api, pathJoin(j.Location, batchKeyRotationName))
}
globalBatchJobsMetrics.delete(j.ID)
deleteConfig(ctx, api, j.Location)
}
func (j *BatchJobRequest) save(ctx context.Context, api ObjectLayer) error {
if j.Replicate == nil && j.KeyRotate == nil {
return errInvalidArgument
}
if err := j.Validate(ctx, api); err != nil {
return err
}
j.Location = pathJoin(batchJobPrefix, j.ID)
job, err := j.MarshalMsg(nil)
if err != nil {
return err
}
return saveConfig(ctx, api, j.Location, job)
}
func (j *BatchJobRequest) load(ctx context.Context, api ObjectLayer, name string) error {
if j == nil {
return nil
}
job, err := readConfig(ctx, api, name)
if err != nil {
if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) {
err = errNoSuchJob
}
return err
}
_, err = j.UnmarshalMsg(job)
return err
}
func batchReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, err error) {
// TODO: support custom storage class for remote replication
putOpts, err = putReplicationOpts(ctx, "", objInfo)
if err != nil {
return putOpts, err
}
putOpts.Internal = miniogo.AdvancedPutOptions{
SourceVersionID: objInfo.VersionID,
SourceMTime: objInfo.ModTime,
SourceETag: objInfo.ETag,
}
return putOpts, nil
}
// ListBatchJobs - lists all currently active batch jobs, optionally takes {jobType}
// input to list only active batch jobs of 'jobType'
func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ListBatchJobs")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.ListBatchJobsAction)
if objectAPI == nil {
return
}
jobType := r.Form.Get("jobType")
if jobType == "" {
jobType = string(madmin.BatchJobReplicate)
}
resultCh := make(chan ObjectInfo)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := objectAPI.Walk(ctx, minioMetaBucket, batchJobPrefix, resultCh, ObjectOptions{}); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
listResult := madmin.ListBatchJobsResult{}
for result := range resultCh {
req := &BatchJobRequest{}
if err := req.load(ctx, objectAPI, result.Name); err != nil {
if !errors.Is(err, errNoSuchJob) {
logger.LogIf(ctx, err)
}
continue
}
if jobType == string(req.Type()) {
listResult.Jobs = append(listResult.Jobs, madmin.BatchJobResult{
ID: req.ID,
Type: req.Type(),
Started: req.Started,
User: req.User,
Elapsed: time.Since(req.Started),
})
}
}
logger.LogIf(ctx, json.NewEncoder(w).Encode(&listResult))
}
var errNoSuchJob = errors.New("no such job")
// DescribeBatchJob returns the currently active batch job definition
func (a adminAPIHandlers) DescribeBatchJob(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "DescribeBatchJob")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.DescribeBatchJobAction)
if objectAPI == nil {
return
}
id := r.Form.Get("jobId")
if id == "" {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, errInvalidArgument), r.URL)
return
}
req := &BatchJobRequest{}
if err := req.load(ctx, objectAPI, pathJoin(batchJobPrefix, id)); err != nil {
if !errors.Is(err, errNoSuchJob) {
logger.LogIf(ctx, err)
}
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
buf, err := yaml.Marshal(req)
if err != nil {
logger.LogIf(ctx, err)
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
w.Write(buf)
}
// StarBatchJob queue a new job for execution
func (a adminAPIHandlers) StartBatchJob(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "StartBatchJob")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI, creds := validateAdminReq(ctx, w, r, iampolicy.StartBatchJobAction)
if objectAPI == nil {
return
}
buf, err := io.ReadAll(r.Body)
if err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
user := creds.AccessKey
if creds.ParentUser != "" {
user = creds.ParentUser
}
job := &BatchJobRequest{}
if err = yaml.Unmarshal(buf, job); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
job.ID = shortuuid.New()
job.User = user
job.Started = time.Now()
if err := job.save(ctx, objectAPI); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if err = globalBatchJobPool.queueJob(job); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
buf, err = json.Marshal(&madmin.BatchJobResult{
ID: job.ID,
Type: job.Type(),
Started: job.Started,
User: job.User,
})
if err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
writeSuccessResponseJSON(w, buf)
}
// CancelBatchJob cancels a job in progress
func (a adminAPIHandlers) CancelBatchJob(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "CancelBatchJob")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.CancelBatchJobAction)
if objectAPI == nil {
return
}
jobID := r.Form.Get("id")
if jobID == "" {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, errInvalidArgument), r.URL)
return
}
if err := globalBatchJobPool.canceler(jobID, true); err != nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrInvalidRequest, err), r.URL)
return
}
j := BatchJobRequest{
ID: jobID,
Location: pathJoin(batchJobPrefix, jobID),
}
j.delete(ctx, objectAPI)
writeSuccessNoContent(w)
}
//msgp:ignore BatchJobPool
// BatchJobPool batch job pool
type BatchJobPool struct {
ctx context.Context
objLayer ObjectLayer
once sync.Once
mu sync.Mutex
jobCh chan *BatchJobRequest
jmu sync.Mutex // protects jobCancelers
jobCancelers map[string]context.CancelFunc
workerKillCh chan struct{}
workerSize int
}
var globalBatchJobPool *BatchJobPool
// newBatchJobPool creates a pool of job manifest workers of specified size
func newBatchJobPool(ctx context.Context, o ObjectLayer, workers int) *BatchJobPool {
jpool := &BatchJobPool{
ctx: ctx,
objLayer: o,
jobCh: make(chan *BatchJobRequest, 10000),
workerKillCh: make(chan struct{}, workers),
jobCancelers: make(map[string]context.CancelFunc),
}
jpool.ResizeWorkers(workers)
jpool.resume()
return jpool
}
func (j *BatchJobPool) resume() {
results := make(chan ObjectInfo, 100)
ctx, cancel := context.WithCancel(j.ctx)
defer cancel()
if err := j.objLayer.Walk(ctx, minioMetaBucket, batchJobPrefix, results, ObjectOptions{}); err != nil {
logger.LogIf(j.ctx, err)
return
}
for result := range results {
// ignore batch-replicate.bin and batch-rotate.bin entries
if strings.HasSuffix(result.Name, slashSeparator) {
continue
}
req := &BatchJobRequest{}
if err := req.load(ctx, j.objLayer, result.Name); err != nil {
logger.LogIf(ctx, err)
continue
}
if err := j.queueJob(req); err != nil {
logger.LogIf(ctx, err)
continue
}
}
}
// AddWorker adds a replication worker to the pool
func (j *BatchJobPool) AddWorker() {
if j == nil {
return
}
for {
select {
case <-j.ctx.Done():
return
case job, ok := <-j.jobCh:
if !ok {
return
}
if job.Replicate != nil {
if job.Replicate.RemoteToLocal() {
if err := job.Replicate.StartFromSource(job.ctx, j.objLayer, *job); err != nil {
if !isErrBucketNotFound(err) {
logger.LogIf(j.ctx, err)
j.canceler(job.ID, false)
continue
}
// Bucket not found proceed to delete such a job.
}
} else {
if err := job.Replicate.Start(job.ctx, j.objLayer, *job); err != nil {
if !isErrBucketNotFound(err) {
logger.LogIf(j.ctx, err)
j.canceler(job.ID, false)
continue
}
// Bucket not found proceed to delete such a job.
}
}
}
if job.KeyRotate != nil {
if err := job.KeyRotate.Start(job.ctx, j.objLayer, *job); err != nil {
if !isErrBucketNotFound(err) {
logger.LogIf(j.ctx, err)
continue
}
}
}
job.delete(j.ctx, j.objLayer)
j.canceler(job.ID, false)
case <-j.workerKillCh:
return
}
}
}
// ResizeWorkers sets replication workers pool to new size
func (j *BatchJobPool) ResizeWorkers(n int) {
if j == nil {
return
}
j.mu.Lock()
defer j.mu.Unlock()
for j.workerSize < n {
j.workerSize++
go j.AddWorker()
}
for j.workerSize > n {
j.workerSize--
go func() { j.workerKillCh <- struct{}{} }()
}
}
func (j *BatchJobPool) queueJob(req *BatchJobRequest) error {
if j == nil {
return errInvalidArgument
}
jctx, jcancel := context.WithCancel(j.ctx)
j.jmu.Lock()
j.jobCancelers[req.ID] = jcancel
j.jmu.Unlock()
req.ctx = jctx
select {
case <-j.ctx.Done():
j.once.Do(func() {
close(j.jobCh)
})
case j.jobCh <- req:
default:
return fmt.Errorf("batch job queue is currently full please try again later %#v", req)
}
return nil
}
// delete canceler from the map, cancel job if requested
func (j *BatchJobPool) canceler(jobID string, cancel bool) error {
if j == nil {
return errInvalidArgument
}
j.jmu.Lock()
defer j.jmu.Unlock()
if canceler, ok := j.jobCancelers[jobID]; ok {
if cancel {
canceler()
}
}
delete(j.jobCancelers, jobID)
return nil
}
//msgp:ignore batchJobMetrics
type batchJobMetrics struct {
sync.RWMutex
metrics map[string]*batchJobInfo
}
var globalBatchJobsMetrics = batchJobMetrics{
metrics: make(map[string]*batchJobInfo),
}
//msgp:ignore batchJobMetric
//go:generate stringer -type=batchJobMetric -trimprefix=batchJobMetric $GOFILE
type batchJobMetric uint8
const (
batchReplicationMetricObject batchJobMetric = iota
batchKeyRotationMetricObject
)
func batchJobTrace(d batchJobMetric, job string, startTime time.Time, duration time.Duration, info ObjectInfo, attempts int, err error) madmin.TraceInfo {
var errStr string
if err != nil {
errStr = err.Error()
}
jobKind := "batchReplication"
traceType := madmin.TraceBatchReplication
if d == batchKeyRotationMetricObject {
jobKind = "batchKeyRotation"
traceType = madmin.TraceBatchKeyRotation
}
funcName := fmt.Sprintf("%s.%s (job-name=%s)", jobKind, d.String(), job)
if attempts > 0 {
funcName = fmt.Sprintf("%s.%s (job-name=%s,attempts=%s)", jobKind, d.String(), job, humanize.Ordinal(attempts))
}
return madmin.TraceInfo{
TraceType: traceType,
Time: startTime,
NodeName: globalLocalNodeName,
FuncName: funcName,
Duration: duration,
Path: info.Name,
Error: errStr,
}
}
func (m *batchJobMetrics) report(jobID string) (metrics *madmin.BatchJobMetrics) {
metrics = &madmin.BatchJobMetrics{CollectedAt: time.Now(), Jobs: make(map[string]madmin.JobMetric)}
m.RLock()
defer m.RUnlock()
for id, job := range m.metrics {
match := jobID != "" && id == jobID
metrics.Jobs[id] = madmin.JobMetric{
JobID: job.JobID,
JobType: job.JobType,
StartTime: job.StartTime,
LastUpdate: job.LastUpdate,
RetryAttempts: job.RetryAttempts,
Complete: job.Complete,
Failed: job.Failed,
Replicate: &madmin.ReplicateInfo{
Bucket: job.Bucket,
Object: job.Object,
Objects: job.Objects,
ObjectsFailed: job.ObjectsFailed,
BytesTransferred: job.BytesTransferred,
BytesFailed: job.BytesFailed,
},
KeyRotate: &madmin.KeyRotationInfo{
Bucket: job.Bucket,
Object: job.Object,
Objects: job.Objects,
ObjectsFailed: job.ObjectsFailed,
},
}
if match {
break
}
}
return metrics
}
func (m *batchJobMetrics) delete(jobID string) {
m.Lock()
defer m.Unlock()
delete(m.metrics, jobID)
}
func (m *batchJobMetrics) save(jobID string, ri *batchJobInfo) {
m.Lock()
defer m.Unlock()
m.metrics[jobID] = ri.clone()
}
func (m *batchJobMetrics) trace(d batchJobMetric, job string, attempts int, info ObjectInfo) func(err error) {
startTime := time.Now()
return func(err error) {
duration := time.Since(startTime)
switch d {
case batchReplicationMetricObject:
if globalTrace.NumSubscribers(madmin.TraceBatchReplication) > 0 {
globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err))
}
case batchKeyRotationMetricObject:
if globalTrace.NumSubscribers(madmin.TraceBatchKeyRotation) > 0 {
globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err))
}
}
}
}