// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . package cmd import ( "bytes" "context" "encoding/binary" "encoding/json" "errors" "fmt" "io" "math/rand" "net/http" "net/url" "runtime" "strconv" "strings" "sync" "time" "github.com/dustin/go-humanize" "github.com/lithammer/shortuuid/v4" "github.com/minio/madmin-go/v3" "github.com/minio/minio-go/v7" miniogo "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/encrypt" "github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio/internal/config/batch" "github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/ioutil" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/console" "github.com/minio/pkg/v2/env" "github.com/minio/pkg/v2/policy" "github.com/minio/pkg/v2/workers" "gopkg.in/yaml.v3" ) var globalBatchConfig batch.Config // BatchJobRequest this is an internal data structure not for external consumption. type BatchJobRequest struct { ID string `yaml:"-" json:"name"` User string `yaml:"-" json:"user"` Started time.Time `yaml:"-" json:"started"` Replicate *BatchJobReplicateV1 `yaml:"replicate" json:"replicate"` KeyRotate *BatchJobKeyRotateV1 `yaml:"keyrotate" json:"keyrotate"` Expire *BatchJobExpire `yaml:"expire" json:"expire"` ctx context.Context `msg:"-"` } func notifyEndpoint(ctx context.Context, ri *batchJobInfo, endpoint, token string) error { if endpoint == "" { return nil } buf, err := json.Marshal(ri) if err != nil { return err } ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(buf)) if err != nil { return err } if token != "" { req.Header.Set("Authorization", token) } req.Header.Set("Content-Type", "application/json") clnt := http.Client{Transport: getRemoteInstanceTransport} resp, err := clnt.Do(req) if err != nil { return err } xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { return errors.New(resp.Status) } return nil } // Notify notifies notification endpoint if configured regarding job failure or success. func (r BatchJobReplicateV1) Notify(ctx context.Context, ri *batchJobInfo) error { return notifyEndpoint(ctx, ri, r.Flags.Notify.Endpoint, r.Flags.Notify.Token) } // ReplicateFromSource - this is not implemented yet where source is 'remote' and target is local. func (r *BatchJobReplicateV1) ReplicateFromSource(ctx context.Context, api ObjectLayer, core *miniogo.Core, srcObjInfo ObjectInfo, retry bool) error { srcBucket := r.Source.Bucket tgtBucket := r.Target.Bucket srcObject := srcObjInfo.Name tgtObject := srcObjInfo.Name if r.Target.Prefix != "" { tgtObject = pathJoin(r.Target.Prefix, srcObjInfo.Name) } versionID := srcObjInfo.VersionID if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 { versionID = "" } if srcObjInfo.DeleteMarker { _, err := api.DeleteObject(ctx, tgtBucket, tgtObject, ObjectOptions{ VersionID: versionID, // Since we are preserving a delete marker, we have to make sure this is always true. // regardless of the current configuration of the bucket we must preserve all versions // on the pool being batch replicated from source. Versioned: true, MTime: srcObjInfo.ModTime, DeleteMarker: srcObjInfo.DeleteMarker, ReplicationRequest: true, }) return err } opts := ObjectOptions{ VersionID: srcObjInfo.VersionID, MTime: srcObjInfo.ModTime, PreserveETag: srcObjInfo.ETag, UserDefined: srcObjInfo.UserDefined, } if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 { opts.VersionID = "" } if crypto.S3.IsEncrypted(srcObjInfo.UserDefined) { opts.ServerSideEncryption = encrypt.NewSSE() } slc := strings.Split(srcObjInfo.ETag, "-") if len(slc) == 2 { partsCount, err := strconv.Atoi(slc[1]) if err != nil { return err } return r.copyWithMultipartfromSource(ctx, api, core, srcObjInfo, opts, partsCount) } gopts := miniogo.GetObjectOptions{ VersionID: srcObjInfo.VersionID, } if err := gopts.SetMatchETag(srcObjInfo.ETag); err != nil { return err } rd, objInfo, _, err := core.GetObject(ctx, srcBucket, srcObject, gopts) if err != nil { return ErrorRespToObjectError(err, srcBucket, srcObject, srcObjInfo.VersionID) } defer rd.Close() hr, err := hash.NewReader(ctx, rd, objInfo.Size, "", "", objInfo.Size) if err != nil { return err } pReader := NewPutObjReader(hr) _, err = api.PutObject(ctx, tgtBucket, tgtObject, pReader, opts) return err } func (r *BatchJobReplicateV1) copyWithMultipartfromSource(ctx context.Context, api ObjectLayer, c *miniogo.Core, srcObjInfo ObjectInfo, opts ObjectOptions, partsCount int) (err error) { srcBucket := r.Source.Bucket tgtBucket := r.Target.Bucket srcObject := srcObjInfo.Name tgtObject := srcObjInfo.Name if r.Target.Prefix != "" { tgtObject = pathJoin(r.Target.Prefix, srcObjInfo.Name) } if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 { opts.VersionID = "" } var uploadedParts []CompletePart res, err := api.NewMultipartUpload(context.Background(), tgtBucket, tgtObject, opts) if err != nil { return err } defer func() { if err != nil { // block and abort remote upload upon failure. attempts := 1 for attempts <= 3 { aerr := api.AbortMultipartUpload(ctx, tgtBucket, tgtObject, res.UploadID, ObjectOptions{}) if aerr == nil { return } logger.LogIf(ctx, fmt.Errorf("trying %s: Unable to cleanup failed multipart replication %s on remote %s/%s: %w - this may consume space on remote cluster", humanize.Ordinal(attempts), res.UploadID, tgtBucket, tgtObject, aerr)) attempts++ time.Sleep(time.Second) } } }() var ( hr *hash.Reader pInfo PartInfo ) for i := 0; i < partsCount; i++ { gopts := miniogo.GetObjectOptions{ VersionID: srcObjInfo.VersionID, PartNumber: i + 1, } if err := gopts.SetMatchETag(srcObjInfo.ETag); err != nil { return err } rd, objInfo, _, err := c.GetObject(ctx, srcBucket, srcObject, gopts) if err != nil { return ErrorRespToObjectError(err, srcBucket, srcObject, srcObjInfo.VersionID) } defer rd.Close() hr, err = hash.NewReader(ctx, io.LimitReader(rd, objInfo.Size), objInfo.Size, "", "", objInfo.Size) if err != nil { return err } pReader := NewPutObjReader(hr) opts.PreserveETag = "" pInfo, err = api.PutObjectPart(ctx, tgtBucket, tgtObject, res.UploadID, i+1, pReader, opts) if err != nil { return err } if pInfo.Size != objInfo.Size { return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, objInfo.Size) } uploadedParts = append(uploadedParts, CompletePart{ PartNumber: pInfo.PartNumber, ETag: pInfo.ETag, }) } _, err = api.CompleteMultipartUpload(ctx, tgtBucket, tgtObject, res.UploadID, uploadedParts, opts) return err } // StartFromSource starts the batch replication job from remote source, resumes if there was a pending job via "job.ID" func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLayer, job BatchJobRequest) error { ri := &batchJobInfo{ JobID: job.ID, JobType: string(job.Type()), StartTime: job.Started, } if err := ri.load(ctx, api, job); err != nil { return err } if ri.Complete { return nil } globalBatchJobsMetrics.save(job.ID, ri) delay := job.Replicate.Flags.Retry.Delay if delay == 0 { delay = batchReplJobDefaultRetryDelay } rnd := rand.New(rand.NewSource(time.Now().UnixNano())) hasTags := len(r.Flags.Filter.Tags) != 0 isMetadata := len(r.Flags.Filter.Metadata) != 0 isStorageClassOnly := len(r.Flags.Filter.Metadata) == 1 && strings.EqualFold(r.Flags.Filter.Metadata[0].Key, xhttp.AmzStorageClass) skip := func(oi ObjectInfo) (ok bool) { if r.Flags.Filter.OlderThan > 0 && time.Since(oi.ModTime) < r.Flags.Filter.OlderThan { // skip all objects that are newer than specified older duration return true } if r.Flags.Filter.NewerThan > 0 && time.Since(oi.ModTime) >= r.Flags.Filter.NewerThan { // skip all objects that are older than specified newer duration return true } if !r.Flags.Filter.CreatedAfter.IsZero() && r.Flags.Filter.CreatedAfter.Before(oi.ModTime) { // skip all objects that are created before the specified time. return true } if !r.Flags.Filter.CreatedBefore.IsZero() && r.Flags.Filter.CreatedBefore.After(oi.ModTime) { // skip all objects that are created after the specified time. return true } if hasTags { // Only parse object tags if tags filter is specified. tagMap := map[string]string{} tagStr := oi.UserTags if len(tagStr) != 0 { t, err := tags.ParseObjectTags(tagStr) if err != nil { return false } tagMap = t.ToMap() } for _, kv := range r.Flags.Filter.Tags { for t, v := range tagMap { if kv.Match(BatchJobKV{Key: t, Value: v}) { return true } } } // None of the provided tags filter match skip the object return false } for _, kv := range r.Flags.Filter.Metadata { for k, v := range oi.UserDefined { if !stringsHasPrefixFold(k, "x-amz-meta-") && !isStandardHeader(k) { continue } // We only need to match x-amz-meta or standardHeaders if kv.Match(BatchJobKV{Key: k, Value: v}) { return true } } } // None of the provided filters match return false } u, err := url.Parse(r.Source.Endpoint) if err != nil { return err } cred := r.Source.Creds c, err := miniogo.New(u.Host, &miniogo.Options{ Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), Secure: u.Scheme == "https", Transport: getRemoteInstanceTransport, BucketLookup: lookupStyle(r.Source.Path), }) if err != nil { return err } c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID) core := &miniogo.Core{Client: c} workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2))) if err != nil { return err } wk, err := workers.New(workerSize) if err != nil { // invalid worker size. return err } retryAttempts := ri.RetryAttempts retry := false for attempts := 1; attempts <= retryAttempts; attempts++ { attempts := attempts // one of source/target is s3, skip delete marker and all versions under the same object name. s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 minioSrc := r.Source.Type == BatchJobReplicateResourceMinIO ctx, cancel := context.WithCancel(ctx) objInfoCh := c.ListObjects(ctx, r.Source.Bucket, miniogo.ListObjectsOptions{ Prefix: r.Source.Prefix, WithVersions: minioSrc, Recursive: true, WithMetadata: true, }) prevObj := "" skipReplicate := false for obj := range objInfoCh { oi := toObjectInfo(r.Source.Bucket, obj.Key, obj) if !minioSrc { // Check if metadata filter was requested and it is expected to have // all user metadata or just storageClass. If its only storageClass // List() already returns relevant information for filter to be applied. if isMetadata && !isStorageClassOnly { oi2, err := c.StatObject(ctx, r.Source.Bucket, obj.Key, miniogo.StatObjectOptions{}) if err == nil { oi = toObjectInfo(r.Source.Bucket, obj.Key, oi2) } else { if !isErrMethodNotAllowed(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) && !isErrObjectNotFound(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) { logger.LogIf(ctx, err) } continue } } if hasTags { tags, err := c.GetObjectTagging(ctx, r.Source.Bucket, obj.Key, minio.GetObjectTaggingOptions{}) if err == nil { oi.UserTags = tags.String() } else { if !isErrMethodNotAllowed(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) && !isErrObjectNotFound(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) { logger.LogIf(ctx, err) } continue } } } if skip(oi) { continue } if obj.Key != prevObj { prevObj = obj.Key // skip replication of delete marker and all versions under the same object name if one of source or target is s3. skipReplicate = obj.IsDeleteMarker && s3Type } if skipReplicate { continue } wk.Take() go func() { defer wk.Give() stopFn := globalBatchJobsMetrics.trace(batchJobMetricReplication, job.ID, attempts) success := true if err := r.ReplicateFromSource(ctx, api, core, oi, retry); err != nil { // object must be deleted concurrently, allow these failures but do not count them if isErrVersionNotFound(err) || isErrObjectNotFound(err) { return } stopFn(oi, err) logger.LogIf(ctx, err) success = false } else { stopFn(oi, nil) } ri.trackCurrentBucketObject(r.Target.Bucket, oi, success) globalBatchJobsMetrics.save(job.ID, ri) // persist in-memory state to disk after every 10secs. logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) if wait := globalBatchConfig.ReplicationWait(); wait > 0 { time.Sleep(wait) } }() } wk.Wait() ri.RetryAttempts = attempts ri.Complete = ri.ObjectsFailed == 0 ri.Failed = ri.ObjectsFailed > 0 globalBatchJobsMetrics.save(job.ID, ri) // persist in-memory state to disk. logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job)) if err := r.Notify(ctx, ri); err != nil { logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err)) } cancel() if ri.Failed { ri.ObjectsFailed = 0 ri.Bucket = "" ri.Object = "" ri.Objects = 0 ri.BytesFailed = 0 ri.BytesTransferred = 0 retry = true // indicate we are retrying.. time.Sleep(delay + time.Duration(rnd.Float64()*float64(delay))) continue } break } return nil } // toObjectInfo converts minio.ObjectInfo to ObjectInfo func toObjectInfo(bucket, object string, objInfo miniogo.ObjectInfo) ObjectInfo { tags, _ := tags.MapToObjectTags(objInfo.UserTags) oi := ObjectInfo{ Bucket: bucket, Name: object, ModTime: objInfo.LastModified, Size: objInfo.Size, ETag: objInfo.ETag, VersionID: objInfo.VersionID, IsLatest: objInfo.IsLatest, DeleteMarker: objInfo.IsDeleteMarker, ContentType: objInfo.ContentType, Expires: objInfo.Expires, StorageClass: objInfo.StorageClass, ReplicationStatusInternal: objInfo.ReplicationStatus, UserTags: tags.String(), } oi.UserDefined = make(map[string]string, len(objInfo.Metadata)) for k, v := range objInfo.Metadata { oi.UserDefined[k] = v[0] } ce, ok := oi.UserDefined[xhttp.ContentEncoding] if !ok { ce, ok = oi.UserDefined[strings.ToLower(xhttp.ContentEncoding)] } if ok { oi.ContentEncoding = ce } _, ok = oi.UserDefined[xhttp.AmzStorageClass] if !ok { oi.UserDefined[xhttp.AmzStorageClass] = objInfo.StorageClass } for k, v := range objInfo.UserMetadata { oi.UserDefined[k] = v } return oi } func (r BatchJobReplicateV1) writeAsArchive(ctx context.Context, objAPI ObjectLayer, remoteClnt *minio.Client, entries []ObjectInfo) error { input := make(chan minio.SnowballObject, 1) opts := minio.SnowballOptions{ Opts: minio.PutObjectOptions{}, InMemory: *r.Source.Snowball.InMemory, Compress: *r.Source.Snowball.Compress, SkipErrs: *r.Source.Snowball.SkipErrs, } go func() { defer xioutil.SafeClose(input) for _, entry := range entries { gr, err := objAPI.GetObjectNInfo(ctx, r.Source.Bucket, entry.Name, nil, nil, ObjectOptions{ VersionID: entry.VersionID, }) if err != nil { logger.LogIf(ctx, err) continue } snowballObj := minio.SnowballObject{ // Create path to store objects within the bucket. Key: entry.Name, Size: entry.Size, ModTime: entry.ModTime, VersionID: entry.VersionID, Content: gr, Headers: make(http.Header), Close: func() { gr.Close() }, } opts, err := batchReplicationOpts(ctx, "", gr.ObjInfo) if err != nil { logger.LogIf(ctx, err) continue } for k, vals := range opts.Header() { for _, v := range vals { snowballObj.Headers.Add(k, v) } } input <- snowballObj } }() // Collect and upload all entries. return remoteClnt.PutObjectsSnowball(ctx, r.Target.Bucket, opts, input) } // ReplicateToTarget read from source and replicate to configured target func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectLayer, c *miniogo.Core, srcObjInfo ObjectInfo, retry bool) error { srcBucket := r.Source.Bucket tgtBucket := r.Target.Bucket tgtPrefix := r.Target.Prefix srcObject := srcObjInfo.Name s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 if srcObjInfo.DeleteMarker || !srcObjInfo.VersionPurgeStatus.Empty() { if retry && !s3Type { if _, err := c.StatObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), miniogo.StatObjectOptions{ VersionID: srcObjInfo.VersionID, Internal: miniogo.AdvancedGetOptions{ ReplicationProxyRequest: "false", }, }); isErrMethodNotAllowed(ErrorRespToObjectError(err, tgtBucket, pathJoin(tgtPrefix, srcObject))) { return nil } } versionID := srcObjInfo.VersionID dmVersionID := "" if srcObjInfo.VersionPurgeStatus.Empty() { dmVersionID = srcObjInfo.VersionID } if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 { dmVersionID = "" versionID = "" } return c.RemoveObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), miniogo.RemoveObjectOptions{ VersionID: versionID, Internal: miniogo.AdvancedRemoveOptions{ ReplicationDeleteMarker: dmVersionID != "", ReplicationMTime: srcObjInfo.ModTime, ReplicationStatus: miniogo.ReplicationStatusReplica, ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside }, }) } if retry && !s3Type { // when we are retrying avoid copying if necessary. gopts := miniogo.GetObjectOptions{} if err := gopts.SetMatchETag(srcObjInfo.ETag); err != nil { return err } if _, err := c.StatObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), gopts); err == nil { return nil } } versioned := globalBucketVersioningSys.PrefixEnabled(srcBucket, srcObject) versionSuspended := globalBucketVersioningSys.PrefixSuspended(srcBucket, srcObject) opts := ObjectOptions{ VersionID: srcObjInfo.VersionID, Versioned: versioned, VersionSuspended: versionSuspended, } rd, err := api.GetObjectNInfo(ctx, srcBucket, srcObject, nil, http.Header{}, opts) if err != nil { return err } defer rd.Close() objInfo := rd.ObjInfo size, err := objInfo.GetActualSize() if err != nil { return err } putOpts, err := batchReplicationOpts(ctx, "", objInfo) if err != nil { return err } if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 { putOpts.Internal = miniogo.AdvancedPutOptions{} } if objInfo.isMultipart() { if err := replicateObjectWithMultipart(ctx, c, tgtBucket, pathJoin(tgtPrefix, objInfo.Name), rd, objInfo, putOpts); err != nil { return err } } else { if _, err = c.PutObject(ctx, tgtBucket, pathJoin(tgtPrefix, objInfo.Name), rd, size, "", "", putOpts); err != nil { return err } } return nil } //go:generate msgp -file $GOFILE -unexported // batchJobInfo current batch replication information type batchJobInfo struct { mu sync.RWMutex `json:"-" msg:"-"` Version int `json:"-" msg:"v"` JobID string `json:"jobID" msg:"jid"` JobType string `json:"jobType" msg:"jt"` StartTime time.Time `json:"startTime" msg:"st"` LastUpdate time.Time `json:"lastUpdate" msg:"lu"` RetryAttempts int `json:"retryAttempts" msg:"ra"` Complete bool `json:"complete" msg:"cmp"` Failed bool `json:"failed" msg:"fld"` // Last bucket/object batch replicated Bucket string `json:"-" msg:"lbkt"` Object string `json:"-" msg:"lobj"` // Verbose information Objects int64 `json:"objects" msg:"ob"` DeleteMarkers int64 `json:"deleteMarkers" msg:"dm"` ObjectsFailed int64 `json:"objectsFailed" msg:"obf"` DeleteMarkersFailed int64 `json:"deleteMarkersFailed" msg:"dmf"` BytesTransferred int64 `json:"bytesTransferred" msg:"bt"` BytesFailed int64 `json:"bytesFailed" msg:"bf"` } const ( batchReplName = "batch-replicate.bin" batchReplFormat = 1 batchReplVersionV1 = 1 batchReplVersion = batchReplVersionV1 batchJobName = "job.bin" batchJobPrefix = "batch-jobs" batchJobReportsPrefix = batchJobPrefix + "/reports" batchReplJobAPIVersion = "v1" batchReplJobDefaultRetries = 3 batchReplJobDefaultRetryDelay = 250 * time.Millisecond ) func getJobReportPath(job BatchJobRequest) string { var fileName string switch { case job.Replicate != nil: fileName = batchReplName case job.KeyRotate != nil: fileName = batchKeyRotationName case job.Expire != nil: fileName = batchExpireName } return pathJoin(batchJobReportsPrefix, job.ID, fileName) } func getJobPath(job BatchJobRequest) string { return pathJoin(batchJobPrefix, job.ID) } func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobRequest) error { var format, version uint16 switch { case job.Replicate != nil: version = batchReplVersionV1 format = batchReplFormat case job.KeyRotate != nil: version = batchKeyRotateVersionV1 format = batchKeyRotationFormat case job.Expire != nil: version = batchExpireVersionV1 format = batchExpireFormat default: return errors.New("no supported batch job request specified") } data, err := readConfig(ctx, api, getJobReportPath(job)) if err != nil { if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) { ri.Version = int(version) switch { case job.Replicate != nil: ri.RetryAttempts = batchReplJobDefaultRetries if job.Replicate.Flags.Retry.Attempts > 0 { ri.RetryAttempts = job.Replicate.Flags.Retry.Attempts } case job.KeyRotate != nil: ri.RetryAttempts = batchKeyRotateJobDefaultRetries if job.KeyRotate.Flags.Retry.Attempts > 0 { ri.RetryAttempts = job.KeyRotate.Flags.Retry.Attempts } case job.Expire != nil: ri.RetryAttempts = batchExpireJobDefaultRetries if job.Expire.Retry.Attempts > 0 { ri.RetryAttempts = job.Expire.Retry.Attempts } } return nil } return err } if len(data) == 0 { // Seems to be empty create a new batchRepl object. return nil } if len(data) <= 4 { return fmt.Errorf("%s: no data", ri.JobType) } // Read header switch binary.LittleEndian.Uint16(data[0:2]) { case format: default: return fmt.Errorf("%s: unknown format: %d", ri.JobType, binary.LittleEndian.Uint16(data[0:2])) } switch binary.LittleEndian.Uint16(data[2:4]) { case version: default: return fmt.Errorf("%s: unknown version: %d", ri.JobType, binary.LittleEndian.Uint16(data[2:4])) } ri.mu.Lock() defer ri.mu.Unlock() // OK, parse data. if _, err = ri.UnmarshalMsg(data[4:]); err != nil { return err } switch ri.Version { case batchReplVersionV1: default: return fmt.Errorf("unexpected batch %s meta version: %d", ri.JobType, ri.Version) } return nil } func (ri *batchJobInfo) clone() *batchJobInfo { ri.mu.RLock() defer ri.mu.RUnlock() return &batchJobInfo{ Version: ri.Version, JobID: ri.JobID, JobType: ri.JobType, RetryAttempts: ri.RetryAttempts, Complete: ri.Complete, Failed: ri.Failed, StartTime: ri.StartTime, LastUpdate: ri.LastUpdate, Bucket: ri.Bucket, Object: ri.Object, Objects: ri.Objects, ObjectsFailed: ri.ObjectsFailed, BytesTransferred: ri.BytesTransferred, BytesFailed: ri.BytesFailed, } } func (ri *batchJobInfo) countItem(size int64, dmarker, success bool) { if ri == nil { return } if success { if dmarker { ri.DeleteMarkers++ } else { ri.Objects++ ri.BytesTransferred += size } } else { if dmarker { ri.DeleteMarkersFailed++ } else { ri.ObjectsFailed++ ri.BytesFailed += size } } } func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, duration time.Duration, job BatchJobRequest) error { if ri == nil { return errInvalidArgument } now := UTCNow() ri.mu.Lock() var ( format, version uint16 jobTyp string ) if now.Sub(ri.LastUpdate) >= duration { switch job.Type() { case madmin.BatchJobReplicate: format = batchReplFormat version = batchReplVersion jobTyp = string(job.Type()) ri.Version = batchReplVersionV1 case madmin.BatchJobKeyRotate: format = batchKeyRotationFormat version = batchKeyRotateVersion jobTyp = string(job.Type()) ri.Version = batchKeyRotateVersionV1 case madmin.BatchJobExpire: format = batchExpireFormat version = batchExpireVersion jobTyp = string(job.Type()) ri.Version = batchExpireVersionV1 default: return errInvalidArgument } if serverDebugLog { console.Debugf("%s: persisting info on drive: threshold:%s, %s:%#v\n", jobTyp, now.Sub(ri.LastUpdate), jobTyp, ri) } ri.LastUpdate = now data := make([]byte, 4, ri.Msgsize()+4) // Initialize the header. binary.LittleEndian.PutUint16(data[0:2], format) binary.LittleEndian.PutUint16(data[2:4], version) buf, err := ri.MarshalMsg(data) ri.mu.Unlock() if err != nil { return err } return saveConfig(ctx, api, getJobReportPath(job), buf) } ri.mu.Unlock() return nil } // Note: to be used only with batch jobs that affect multiple versions through // a single action. e.g batch-expire has an option to expire all versions of an // object which matches the given filters. func (ri *batchJobInfo) trackMultipleObjectVersions(bucket string, info ObjectInfo, success bool) { if success { ri.Objects += int64(info.NumVersions) } else { ri.ObjectsFailed += int64(info.NumVersions) } } func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, success bool) { if ri == nil { return } ri.mu.Lock() defer ri.mu.Unlock() ri.Bucket = bucket ri.Object = info.Name ri.countItem(info.Size, info.DeleteMarker, success) } func (ri *batchJobInfo) trackCurrentBucketBatch(bucket string, batch []ObjectInfo) { if ri == nil { return } ri.mu.Lock() defer ri.mu.Unlock() ri.Bucket = bucket for i := range batch { ri.Object = batch[i].Name ri.countItem(batch[i].Size, batch[i].DeleteMarker, true) } } // Start start the batch replication job, resumes if there was a pending job via "job.ID" func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job BatchJobRequest) error { ri := &batchJobInfo{ JobID: job.ID, JobType: string(job.Type()), StartTime: job.Started, } if err := ri.load(ctx, api, job); err != nil { return err } if ri.Complete { return nil } globalBatchJobsMetrics.save(job.ID, ri) lastObject := ri.Object delay := job.Replicate.Flags.Retry.Delay if delay == 0 { delay = batchReplJobDefaultRetryDelay } rnd := rand.New(rand.NewSource(time.Now().UnixNano())) selectObj := func(info FileInfo) (ok bool) { if r.Flags.Filter.OlderThan > 0 && time.Since(info.ModTime) < r.Flags.Filter.OlderThan { // skip all objects that are newer than specified older duration return false } if r.Flags.Filter.NewerThan > 0 && time.Since(info.ModTime) >= r.Flags.Filter.NewerThan { // skip all objects that are older than specified newer duration return false } if !r.Flags.Filter.CreatedAfter.IsZero() && r.Flags.Filter.CreatedAfter.Before(info.ModTime) { // skip all objects that are created before the specified time. return false } if !r.Flags.Filter.CreatedBefore.IsZero() && r.Flags.Filter.CreatedBefore.After(info.ModTime) { // skip all objects that are created after the specified time. return false } if len(r.Flags.Filter.Tags) > 0 { // Only parse object tags if tags filter is specified. tagMap := map[string]string{} tagStr := info.Metadata[xhttp.AmzObjectTagging] if len(tagStr) != 0 { t, err := tags.ParseObjectTags(tagStr) if err != nil { return false } tagMap = t.ToMap() } for _, kv := range r.Flags.Filter.Tags { for t, v := range tagMap { if kv.Match(BatchJobKV{Key: t, Value: v}) { return true } } } // None of the provided tags filter match skip the object return false } if len(r.Flags.Filter.Metadata) > 0 { for _, kv := range r.Flags.Filter.Metadata { for k, v := range info.Metadata { if !stringsHasPrefixFold(k, "x-amz-meta-") && !isStandardHeader(k) { continue } // We only need to match x-amz-meta or standardHeaders if kv.Match(BatchJobKV{Key: k, Value: v}) { return true } } } // None of the provided metadata filters match skip the object. return false } // if one of source or target is non MinIO, just replicate the top most version like `mc mirror` return !((r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3) && !info.IsLatest) } u, err := url.Parse(r.Target.Endpoint) if err != nil { return err } cred := r.Target.Creds c, err := miniogo.NewCore(u.Host, &miniogo.Options{ Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), Secure: u.Scheme == "https", Transport: getRemoteInstanceTransport, BucketLookup: lookupStyle(r.Target.Path), }) if err != nil { return err } c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID) var ( walkCh = make(chan ObjectInfo, 100) slowCh = make(chan ObjectInfo, 100) ) if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() { go func() { // Snowball currently needs the high level minio-go Client, not the Core one cl, err := miniogo.New(u.Host, &miniogo.Options{ Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), Secure: u.Scheme == "https", Transport: getRemoteInstanceTransport, BucketLookup: lookupStyle(r.Target.Path), }) if err != nil { logger.LogIf(ctx, err) return } // Already validated before arriving here smallerThan, _ := humanize.ParseBytes(*r.Source.Snowball.SmallerThan) batch := make([]ObjectInfo, 0, *r.Source.Snowball.Batch) writeFn := func(batch []ObjectInfo) { if len(batch) > 0 { if err := r.writeAsArchive(ctx, api, cl, batch); err != nil { logger.LogIf(ctx, err) for _, b := range batch { slowCh <- b } } else { ri.trackCurrentBucketBatch(r.Source.Bucket, batch) globalBatchJobsMetrics.save(job.ID, ri) // persist in-memory state to disk after every 10secs. logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) } } } for obj := range walkCh { if obj.DeleteMarker || !obj.VersionPurgeStatus.Empty() || obj.Size >= int64(smallerThan) { slowCh <- obj continue } batch = append(batch, obj) if len(batch) < *r.Source.Snowball.Batch { continue } writeFn(batch) batch = batch[:0] } writeFn(batch) xioutil.SafeClose(slowCh) }() } else { slowCh = walkCh } workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2))) if err != nil { return err } wk, err := workers.New(workerSize) if err != nil { // invalid worker size. return err } walkQuorum := env.Get("_MINIO_BATCH_REPLICATION_WALK_QUORUM", "strict") if walkQuorum == "" { walkQuorum = "strict" } retryAttempts := ri.RetryAttempts retry := false for attempts := 1; attempts <= retryAttempts; attempts++ { attempts := attempts ctx, cancel := context.WithCancel(ctx) // one of source/target is s3, skip delete marker and all versions under the same object name. s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, walkCh, WalkOptions{ Marker: lastObject, Filter: selectObj, AskDisks: walkQuorum, }); err != nil { cancel() // Do not need to retry if we can't list objects on source. return err } prevObj := "" skipReplicate := false for result := range slowCh { result := result if result.Name != prevObj { prevObj = result.Name skipReplicate = result.DeleteMarker && s3Type } if skipReplicate { continue } wk.Take() go func() { defer wk.Give() stopFn := globalBatchJobsMetrics.trace(batchJobMetricReplication, job.ID, attempts) success := true if err := r.ReplicateToTarget(ctx, api, c, result, retry); err != nil { if miniogo.ToErrorResponse(err).Code == "PreconditionFailed" { // pre-condition failed means we already have the object copied over. return } // object must be deleted concurrently, allow these failures but do not count them if isErrVersionNotFound(err) || isErrObjectNotFound(err) { return } stopFn(result, err) logger.LogIf(ctx, err) success = false } else { stopFn(result, nil) } ri.trackCurrentBucketObject(r.Source.Bucket, result, success) globalBatchJobsMetrics.save(job.ID, ri) // persist in-memory state to disk after every 10secs. logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) if wait := globalBatchConfig.ReplicationWait(); wait > 0 { time.Sleep(wait) } }() } wk.Wait() ri.RetryAttempts = attempts ri.Complete = ri.ObjectsFailed == 0 ri.Failed = ri.ObjectsFailed > 0 globalBatchJobsMetrics.save(job.ID, ri) // persist in-memory state to disk. logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job)) if err := r.Notify(ctx, ri); err != nil { logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err)) } cancel() if ri.Failed { ri.ObjectsFailed = 0 ri.Bucket = "" ri.Object = "" ri.Objects = 0 ri.BytesFailed = 0 ri.BytesTransferred = 0 retry = true // indicate we are retrying.. time.Sleep(delay + time.Duration(rnd.Float64()*float64(delay))) continue } break } return nil } //msgp:ignore batchReplicationJobError type batchReplicationJobError struct { Code string Description string HTTPStatusCode int } func (e batchReplicationJobError) Error() string { return e.Description } // Validate validates the job definition input func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest, o ObjectLayer) error { if r == nil { return nil } if r.APIVersion != batchReplJobAPIVersion { return errInvalidArgument } if r.Source.Bucket == "" { return errInvalidArgument } var isRemoteToLocal bool localBkt := r.Source.Bucket if r.Source.Endpoint != "" { localBkt = r.Target.Bucket isRemoteToLocal = true } info, err := o.GetBucketInfo(ctx, localBkt, BucketOptions{}) if err != nil { if isErrBucketNotFound(err) { return batchReplicationJobError{ Code: "NoSuchSourceBucket", Description: fmt.Sprintf("The specified bucket %s does not exist", localBkt), HTTPStatusCode: http.StatusNotFound, } } return err } if err := r.Source.Type.Validate(); err != nil { return err } if err := r.Source.Snowball.Validate(); err != nil { return err } if r.Source.Creds.Empty() && r.Target.Creds.Empty() { return errInvalidArgument } if !r.Source.Creds.Empty() { if err := r.Source.Creds.Validate(); err != nil { return err } } if r.Target.Endpoint == "" && !r.Target.Creds.Empty() { return errInvalidArgument } if r.Source.Endpoint == "" && !r.Source.Creds.Empty() { return errInvalidArgument } if r.Source.Endpoint != "" && !r.Source.Type.isMinio() && !r.Source.ValidPath() { return errInvalidArgument } if r.Target.Endpoint != "" && !r.Target.Type.isMinio() && !r.Target.ValidPath() { return errInvalidArgument } if r.Target.Bucket == "" { return errInvalidArgument } if !r.Target.Creds.Empty() { if err := r.Target.Creds.Validate(); err != nil { return err } } if r.Source.Creds.Empty() && r.Target.Creds.Empty() { return errInvalidArgument } if err := r.Target.Type.Validate(); err != nil { return err } for _, tag := range r.Flags.Filter.Tags { if err := tag.Validate(); err != nil { return err } } for _, meta := range r.Flags.Filter.Metadata { if err := meta.Validate(); err != nil { return err } } if err := r.Flags.Retry.Validate(); err != nil { return err } remoteEp := r.Target.Endpoint remoteBkt := r.Target.Bucket cred := r.Target.Creds pathStyle := r.Target.Path if r.Source.Endpoint != "" { remoteEp = r.Source.Endpoint cred = r.Source.Creds remoteBkt = r.Source.Bucket pathStyle = r.Source.Path } u, err := url.Parse(remoteEp) if err != nil { return err } c, err := miniogo.NewCore(u.Host, &miniogo.Options{ Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), Secure: u.Scheme == "https", Transport: getRemoteInstanceTransport, BucketLookup: lookupStyle(pathStyle), }) if err != nil { return err } c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID) vcfg, err := c.GetBucketVersioning(ctx, remoteBkt) if err != nil { if miniogo.ToErrorResponse(err).Code == "NoSuchBucket" { return batchReplicationJobError{ Code: "NoSuchTargetBucket", Description: "The specified target bucket does not exist", HTTPStatusCode: http.StatusNotFound, } } return err } // if both source and target are minio instances minioType := r.Target.Type == BatchJobReplicateResourceMinIO && r.Source.Type == BatchJobReplicateResourceMinIO // If source has versioning enabled, target must have versioning enabled if minioType && ((info.Versioning && !vcfg.Enabled() && !isRemoteToLocal) || (!info.Versioning && vcfg.Enabled() && isRemoteToLocal)) { return batchReplicationJobError{ Code: "InvalidBucketState", Description: fmt.Sprintf("The source '%s' has versioning enabled, target '%s' must have versioning enabled", r.Source.Bucket, r.Target.Bucket), HTTPStatusCode: http.StatusBadRequest, } } r.clnt = c return nil } // Type returns type of batch job, currently only supports 'replicate' func (j BatchJobRequest) Type() madmin.BatchJobType { switch { case j.Replicate != nil: return madmin.BatchJobReplicate case j.KeyRotate != nil: return madmin.BatchJobKeyRotate case j.Expire != nil: return madmin.BatchJobExpire } return madmin.BatchJobType("unknown") } // Validate validates the current job, used by 'save()' before // persisting the job request func (j BatchJobRequest) Validate(ctx context.Context, o ObjectLayer) error { switch { case j.Replicate != nil: return j.Replicate.Validate(ctx, j, o) case j.KeyRotate != nil: return j.KeyRotate.Validate(ctx, j, o) case j.Expire != nil: return j.Expire.Validate(ctx, j, o) } return errInvalidArgument } func (j BatchJobRequest) delete(ctx context.Context, api ObjectLayer) { deleteConfig(ctx, api, getJobReportPath(j)) deleteConfig(ctx, api, getJobPath(j)) } func (j *BatchJobRequest) save(ctx context.Context, api ObjectLayer) error { if j.Replicate == nil && j.KeyRotate == nil && j.Expire == nil { return errInvalidArgument } if err := j.Validate(ctx, api); err != nil { return err } job, err := j.MarshalMsg(nil) if err != nil { return err } return saveConfig(ctx, api, getJobPath(*j), job) } func (j *BatchJobRequest) load(ctx context.Context, api ObjectLayer, name string) error { if j == nil { return nil } job, err := readConfig(ctx, api, name) if err != nil { if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) { err = errNoSuchJob } return err } _, err = j.UnmarshalMsg(job) return err } func batchReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, err error) { // TODO: support custom storage class for remote replication putOpts, err = putReplicationOpts(ctx, "", objInfo) if err != nil { return putOpts, err } putOpts.Internal = miniogo.AdvancedPutOptions{ SourceVersionID: objInfo.VersionID, SourceMTime: objInfo.ModTime, SourceETag: objInfo.ETag, ReplicationRequest: true, } return putOpts, nil } // ListBatchJobs - lists all currently active batch jobs, optionally takes {jobType} // input to list only active batch jobs of 'jobType' func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request) { ctx := r.Context() objectAPI, _ := validateAdminReq(ctx, w, r, policy.ListBatchJobsAction) if objectAPI == nil { return } jobType := r.Form.Get("jobType") if jobType == "" { jobType = string(madmin.BatchJobReplicate) } resultCh := make(chan ObjectInfo) ctx, cancel := context.WithCancel(ctx) defer cancel() if err := objectAPI.Walk(ctx, minioMetaBucket, batchJobPrefix, resultCh, WalkOptions{}); err != nil { writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return } listResult := madmin.ListBatchJobsResult{} for result := range resultCh { req := &BatchJobRequest{} if err := req.load(ctx, objectAPI, result.Name); err != nil { if !errors.Is(err, errNoSuchJob) { logger.LogIf(ctx, err) } continue } if jobType == string(req.Type()) { listResult.Jobs = append(listResult.Jobs, madmin.BatchJobResult{ ID: req.ID, Type: req.Type(), Started: req.Started, User: req.User, Elapsed: time.Since(req.Started), }) } } logger.LogIf(ctx, json.NewEncoder(w).Encode(&listResult)) } var errNoSuchJob = errors.New("no such job") // DescribeBatchJob returns the currently active batch job definition func (a adminAPIHandlers) DescribeBatchJob(w http.ResponseWriter, r *http.Request) { ctx := r.Context() objectAPI, _ := validateAdminReq(ctx, w, r, policy.DescribeBatchJobAction) if objectAPI == nil { return } jobID := r.Form.Get("jobId") if jobID == "" { writeErrorResponseJSON(ctx, w, toAPIError(ctx, errInvalidArgument), r.URL) return } req := &BatchJobRequest{} if err := req.load(ctx, objectAPI, pathJoin(batchJobPrefix, jobID)); err != nil { if !errors.Is(err, errNoSuchJob) { logger.LogIf(ctx, err) } writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return } buf, err := yaml.Marshal(req) if err != nil { logger.LogIf(ctx, err) writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return } w.Write(buf) } // StarBatchJob queue a new job for execution func (a adminAPIHandlers) StartBatchJob(w http.ResponseWriter, r *http.Request) { ctx := r.Context() objectAPI, creds := validateAdminReq(ctx, w, r, policy.StartBatchJobAction) if objectAPI == nil { return } buf, err := io.ReadAll(ioutil.HardLimitReader(r.Body, humanize.MiByte*4)) if err != nil { writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return } user := creds.AccessKey if creds.ParentUser != "" { user = creds.ParentUser } job := &BatchJobRequest{} if err = yaml.Unmarshal(buf, job); err != nil { writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return } // Fill with default values if job.Replicate != nil { if job.Replicate.Source.Snowball.Disable == nil { job.Replicate.Source.Snowball.Disable = ptr(false) } if job.Replicate.Source.Snowball.Batch == nil { job.Replicate.Source.Snowball.Batch = ptr(100) } if job.Replicate.Source.Snowball.InMemory == nil { job.Replicate.Source.Snowball.InMemory = ptr(true) } if job.Replicate.Source.Snowball.Compress == nil { job.Replicate.Source.Snowball.Compress = ptr(false) } if job.Replicate.Source.Snowball.SmallerThan == nil { job.Replicate.Source.Snowball.SmallerThan = ptr("5MiB") } if job.Replicate.Source.Snowball.SkipErrs == nil { job.Replicate.Source.Snowball.SkipErrs = ptr(true) } } // Validate the incoming job request if err := job.Validate(ctx, objectAPI); err != nil { writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return } job.ID = fmt.Sprintf("%s:%d", shortuuid.New(), GetProxyEndpointLocalIndex(globalProxyEndpoints)) job.User = user job.Started = time.Now() if err := job.save(ctx, objectAPI); err != nil { writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return } if err = globalBatchJobPool.queueJob(job); err != nil { writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return } buf, err = json.Marshal(&madmin.BatchJobResult{ ID: job.ID, Type: job.Type(), Started: job.Started, User: job.User, }) if err != nil { writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return } writeSuccessResponseJSON(w, buf) } // CancelBatchJob cancels a job in progress func (a adminAPIHandlers) CancelBatchJob(w http.ResponseWriter, r *http.Request) { ctx := r.Context() objectAPI, _ := validateAdminReq(ctx, w, r, policy.CancelBatchJobAction) if objectAPI == nil { return } jobID := r.Form.Get("id") if jobID == "" { writeErrorResponseJSON(ctx, w, toAPIError(ctx, errInvalidArgument), r.URL) return } if _, success := proxyRequestByToken(ctx, w, r, jobID); success { return } if err := globalBatchJobPool.canceler(jobID, true); err != nil { writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrInvalidRequest, err), r.URL) return } j := BatchJobRequest{ ID: jobID, } j.delete(ctx, objectAPI) writeSuccessNoContent(w) } //msgp:ignore BatchJobPool // BatchJobPool batch job pool type BatchJobPool struct { ctx context.Context objLayer ObjectLayer once sync.Once mu sync.Mutex jobCh chan *BatchJobRequest jmu sync.Mutex // protects jobCancelers jobCancelers map[string]context.CancelFunc workerKillCh chan struct{} workerSize int } var globalBatchJobPool *BatchJobPool // newBatchJobPool creates a pool of job manifest workers of specified size func newBatchJobPool(ctx context.Context, o ObjectLayer, workers int) *BatchJobPool { jpool := &BatchJobPool{ ctx: ctx, objLayer: o, jobCh: make(chan *BatchJobRequest, 10000), workerKillCh: make(chan struct{}, workers), jobCancelers: make(map[string]context.CancelFunc), } jpool.ResizeWorkers(workers) jpool.resume() return jpool } func (j *BatchJobPool) resume() { results := make(chan ObjectInfo, 100) ctx, cancel := context.WithCancel(j.ctx) defer cancel() if err := j.objLayer.Walk(ctx, minioMetaBucket, batchJobPrefix, results, WalkOptions{}); err != nil { logger.LogIf(j.ctx, err) return } for result := range results { // ignore batch-replicate.bin and batch-rotate.bin entries if strings.HasSuffix(result.Name, slashSeparator) { continue } req := &BatchJobRequest{} if err := req.load(ctx, j.objLayer, result.Name); err != nil { logger.LogIf(ctx, err) continue } _, nodeIdx := parseRequestToken(req.ID) if nodeIdx > -1 && GetProxyEndpointLocalIndex(globalProxyEndpoints) != nodeIdx { // This job doesn't belong on this node. continue } if err := j.queueJob(req); err != nil { logger.LogIf(ctx, err) continue } } } // AddWorker adds a replication worker to the pool func (j *BatchJobPool) AddWorker() { if j == nil { return } for { select { case <-j.ctx.Done(): return case job, ok := <-j.jobCh: if !ok { return } switch { case job.Replicate != nil: if job.Replicate.RemoteToLocal() { if err := job.Replicate.StartFromSource(job.ctx, j.objLayer, *job); err != nil { if !isErrBucketNotFound(err) { logger.LogIf(j.ctx, err) j.canceler(job.ID, false) continue } // Bucket not found proceed to delete such a job. } } else { if err := job.Replicate.Start(job.ctx, j.objLayer, *job); err != nil { if !isErrBucketNotFound(err) { logger.LogIf(j.ctx, err) j.canceler(job.ID, false) continue } // Bucket not found proceed to delete such a job. } } case job.KeyRotate != nil: if err := job.KeyRotate.Start(job.ctx, j.objLayer, *job); err != nil { if !isErrBucketNotFound(err) { logger.LogIf(j.ctx, err) continue } } case job.Expire != nil: if err := job.Expire.Start(job.ctx, j.objLayer, *job); err != nil { if !isErrBucketNotFound(err) { logger.LogIf(j.ctx, err) continue } } } job.delete(j.ctx, j.objLayer) j.canceler(job.ID, false) case <-j.workerKillCh: return } } } // ResizeWorkers sets replication workers pool to new size func (j *BatchJobPool) ResizeWorkers(n int) { if j == nil { return } j.mu.Lock() defer j.mu.Unlock() for j.workerSize < n { j.workerSize++ go j.AddWorker() } for j.workerSize > n { j.workerSize-- go func() { j.workerKillCh <- struct{}{} }() } } func (j *BatchJobPool) queueJob(req *BatchJobRequest) error { if j == nil { return errInvalidArgument } jctx, jcancel := context.WithCancel(j.ctx) j.jmu.Lock() j.jobCancelers[req.ID] = jcancel j.jmu.Unlock() req.ctx = jctx select { case <-j.ctx.Done(): j.once.Do(func() { xioutil.SafeClose(j.jobCh) }) case j.jobCh <- req: default: return fmt.Errorf("batch job queue is currently full please try again later %#v", req) } return nil } // delete canceler from the map, cancel job if requested func (j *BatchJobPool) canceler(jobID string, cancel bool) error { if j == nil { return errInvalidArgument } j.jmu.Lock() defer j.jmu.Unlock() if canceler, ok := j.jobCancelers[jobID]; ok { if cancel { canceler() } } delete(j.jobCancelers, jobID) return nil } //msgp:ignore batchJobMetrics type batchJobMetrics struct { sync.RWMutex metrics map[string]*batchJobInfo } //msgp:ignore batchJobMetric //go:generate stringer -type=batchJobMetric -trimprefix=batchJobMetric $GOFILE type batchJobMetric uint8 const ( batchJobMetricReplication batchJobMetric = iota batchJobMetricKeyRotation batchJobMetricExpire ) func batchJobTrace(d batchJobMetric, job string, startTime time.Time, duration time.Duration, info objTraceInfoer, attempts int, err error) madmin.TraceInfo { var errStr string if err != nil { errStr = err.Error() } traceType := madmin.TraceBatchReplication switch d { case batchJobMetricKeyRotation: traceType = madmin.TraceBatchKeyRotation case batchJobMetricExpire: traceType = madmin.TraceBatchExpire } funcName := fmt.Sprintf("%s() (job-name=%s)", d.String(), job) if attempts > 0 { funcName = fmt.Sprintf("%s() (job-name=%s,attempts=%s)", d.String(), job, humanize.Ordinal(attempts)) } return madmin.TraceInfo{ TraceType: traceType, Time: startTime, NodeName: globalLocalNodeName, FuncName: funcName, Duration: duration, Path: fmt.Sprintf("%s (versionID=%s)", info.TraceObjName(), info.TraceVersionID()), Error: errStr, } } func (ri *batchJobInfo) metric() madmin.JobMetric { m := madmin.JobMetric{ JobID: ri.JobID, JobType: ri.JobType, StartTime: ri.StartTime, LastUpdate: ri.LastUpdate, RetryAttempts: ri.RetryAttempts, Complete: ri.Complete, Failed: ri.Failed, } switch ri.JobType { case string(madmin.BatchJobReplicate): m.Replicate = &madmin.ReplicateInfo{ Bucket: ri.Bucket, Object: ri.Object, Objects: ri.Objects, ObjectsFailed: ri.ObjectsFailed, BytesTransferred: ri.BytesTransferred, BytesFailed: ri.BytesFailed, } case string(madmin.BatchJobKeyRotate): m.KeyRotate = &madmin.KeyRotationInfo{ Bucket: ri.Bucket, Object: ri.Object, Objects: ri.Objects, ObjectsFailed: ri.ObjectsFailed, } case string(madmin.BatchJobExpire): m.Expired = &madmin.ExpirationInfo{ Bucket: ri.Bucket, Object: ri.Object, Objects: ri.Objects, ObjectsFailed: ri.ObjectsFailed, } } return m } func (m *batchJobMetrics) report(jobID string) (metrics *madmin.BatchJobMetrics) { metrics = &madmin.BatchJobMetrics{CollectedAt: time.Now(), Jobs: make(map[string]madmin.JobMetric)} m.RLock() defer m.RUnlock() if jobID != "" { if job, ok := m.metrics[jobID]; ok { metrics.Jobs[jobID] = job.metric() } return metrics } for id, job := range m.metrics { metrics.Jobs[id] = job.metric() } return metrics } // keep job metrics for some time after the job is completed // in-case some one wants to look at the older results. func (m *batchJobMetrics) purgeJobMetrics() { t := time.NewTicker(6 * time.Hour) defer t.Stop() for { select { case <-GlobalContext.Done(): return case <-t.C: var toDeleteJobMetrics []string m.RLock() for id, metrics := range m.metrics { if time.Since(metrics.LastUpdate) > 24*time.Hour && (metrics.Complete || metrics.Failed) { toDeleteJobMetrics = append(toDeleteJobMetrics, id) } } m.RUnlock() for _, jobID := range toDeleteJobMetrics { m.delete(jobID) } } } } func (m *batchJobMetrics) delete(jobID string) { m.Lock() defer m.Unlock() delete(m.metrics, jobID) } func (m *batchJobMetrics) save(jobID string, ri *batchJobInfo) { m.Lock() defer m.Unlock() m.metrics[jobID] = ri.clone() } type objTraceInfoer interface { TraceObjName() string TraceVersionID() string } // TraceObjName returns name of object being traced func (td ObjectToDelete) TraceObjName() string { return td.ObjectName } // TraceVersionID returns version-id of object being traced func (td ObjectToDelete) TraceVersionID() string { return td.VersionID } // TraceObjName returns name of object being traced func (oi ObjectInfo) TraceObjName() string { return oi.Name } // TraceVersionID returns version-id of object being traced func (oi ObjectInfo) TraceVersionID() string { return oi.VersionID } func (m *batchJobMetrics) trace(d batchJobMetric, job string, attempts int) func(info objTraceInfoer, err error) { startTime := time.Now() return func(info objTraceInfoer, err error) { duration := time.Since(startTime) if globalTrace.NumSubscribers(madmin.TraceBatch) > 0 { globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err)) return } switch d { case batchJobMetricReplication: if globalTrace.NumSubscribers(madmin.TraceBatchReplication) > 0 { globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err)) } case batchJobMetricKeyRotation: if globalTrace.NumSubscribers(madmin.TraceBatchKeyRotation) > 0 { globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err)) } case batchJobMetricExpire: if globalTrace.NumSubscribers(madmin.TraceBatchExpire) > 0 { globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err)) } } } } func lookupStyle(s string) miniogo.BucketLookupType { var lookup miniogo.BucketLookupType switch s { case "on": lookup = miniogo.BucketLookupPath case "off": lookup = miniogo.BucketLookupDNS default: lookup = miniogo.BucketLookupAuto } return lookup }