mirror of
https://github.com/minio/minio.git
synced 2025-11-09 05:34:56 -05:00
add parallel workers in batch replication (#16609)
This commit is contained in:
@@ -28,6 +28,8 @@ import (
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -41,7 +43,9 @@ import (
|
||||
"github.com/minio/minio/internal/auth"
|
||||
xhttp "github.com/minio/minio/internal/http"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/minio/internal/workers"
|
||||
"github.com/minio/pkg/console"
|
||||
"github.com/minio/pkg/env"
|
||||
iampolicy "github.com/minio/pkg/iam/policy"
|
||||
"github.com/minio/pkg/wildcard"
|
||||
"gopkg.in/yaml.v2"
|
||||
@@ -356,6 +360,8 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL
|
||||
|
||||
// batchJobInfo current batch replication information
|
||||
type batchJobInfo struct {
|
||||
mu sync.RWMutex `json:"-" msg:"-"`
|
||||
|
||||
Version int `json:"-" msg:"v"`
|
||||
JobID string `json:"jobID" msg:"jid"`
|
||||
JobType string `json:"jobType" msg:"jt"`
|
||||
@@ -425,6 +431,9 @@ func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobR
|
||||
return fmt.Errorf("batchRepl: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
|
||||
}
|
||||
|
||||
ri.mu.Lock()
|
||||
defer ri.mu.Unlock()
|
||||
|
||||
// OK, parse data.
|
||||
if _, err = ri.UnmarshalMsg(data[4:]); err != nil {
|
||||
return err
|
||||
@@ -439,8 +448,11 @@ func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobR
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ri batchJobInfo) clone() batchJobInfo {
|
||||
return batchJobInfo{
|
||||
func (ri *batchJobInfo) clone() *batchJobInfo {
|
||||
ri.mu.RLock()
|
||||
defer ri.mu.RUnlock()
|
||||
|
||||
return &batchJobInfo{
|
||||
Version: ri.Version,
|
||||
JobID: ri.JobID,
|
||||
JobType: ri.JobType,
|
||||
@@ -458,21 +470,6 @@ func (ri batchJobInfo) clone() batchJobInfo {
|
||||
}
|
||||
}
|
||||
|
||||
func (ri batchJobInfo) save(ctx context.Context, api ObjectLayer, jobLocation string) error {
|
||||
data := make([]byte, 4, ri.Msgsize()+4)
|
||||
|
||||
// Initialize the header.
|
||||
binary.LittleEndian.PutUint16(data[0:2], batchReplFormat)
|
||||
binary.LittleEndian.PutUint16(data[2:4], batchReplVersion)
|
||||
|
||||
buf, err := ri.MarshalMsg(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return saveConfig(ctx, api, pathJoin(jobLocation, batchReplName), buf)
|
||||
}
|
||||
|
||||
func (ri *batchJobInfo) countItem(size int64, dmarker, success bool) {
|
||||
if ri == nil {
|
||||
return
|
||||
@@ -499,14 +496,28 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati
|
||||
return errInvalidArgument
|
||||
}
|
||||
now := UTCNow()
|
||||
ri.mu.Lock()
|
||||
if now.Sub(ri.LastUpdate) >= duration {
|
||||
if serverDebugLog {
|
||||
console.Debugf("batchReplicate: persisting batchReplication info on drive: threshold:%s, batchRepl:%#v\n", now.Sub(ri.LastUpdate), ri)
|
||||
}
|
||||
ri.LastUpdate = now
|
||||
ri.Version = batchReplVersionV1
|
||||
return ri.save(ctx, api, jobLocation)
|
||||
|
||||
data := make([]byte, 4, ri.Msgsize()+4)
|
||||
|
||||
// Initialize the header.
|
||||
binary.LittleEndian.PutUint16(data[0:2], batchReplFormat)
|
||||
binary.LittleEndian.PutUint16(data[2:4], batchReplVersion)
|
||||
|
||||
buf, err := ri.MarshalMsg(data)
|
||||
ri.mu.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return saveConfig(ctx, api, pathJoin(jobLocation, batchReplName), buf)
|
||||
}
|
||||
ri.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -514,6 +525,10 @@ func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo,
|
||||
if ri == nil {
|
||||
return
|
||||
}
|
||||
|
||||
ri.mu.Lock()
|
||||
defer ri.mu.Unlock()
|
||||
|
||||
ri.Bucket = bucket
|
||||
ri.Object = info.Name
|
||||
ri.countItem(info.Size, info.DeleteMarker, failed)
|
||||
@@ -529,7 +544,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
|
||||
if err := ri.load(ctx, api, job); err != nil {
|
||||
return err
|
||||
}
|
||||
globalBatchJobsMetrics.save(job.ID, ri.clone())
|
||||
globalBatchJobsMetrics.save(job.ID, ri)
|
||||
lastObject := ri.Object
|
||||
|
||||
delay := job.Replicate.Flags.Retry.Delay
|
||||
@@ -619,9 +634,22 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
|
||||
return err
|
||||
}
|
||||
|
||||
workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
retryAttempts := ri.RetryAttempts
|
||||
retry := false
|
||||
for attempts := 1; attempts <= retryAttempts; attempts++ {
|
||||
attempts := attempts
|
||||
|
||||
wk, err := workers.New(workerSize)
|
||||
if err != nil {
|
||||
// invalid worker size.
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
results := make(chan ObjectInfo, 100)
|
||||
@@ -635,31 +663,40 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
|
||||
}
|
||||
|
||||
for result := range results {
|
||||
stopFn := globalBatchJobsMetrics.trace(batchReplicationMetricObject, job.ID, attempts, result)
|
||||
success := true
|
||||
if err := r.ReplicateToTarget(ctx, api, c, result, retry); err != nil {
|
||||
if isErrVersionNotFound(err) || isErrObjectNotFound(err) {
|
||||
// object must be deleted concurrently, allow
|
||||
// these failures but do not count them
|
||||
continue
|
||||
result := result
|
||||
wk.Take()
|
||||
go func() {
|
||||
defer wk.Give()
|
||||
|
||||
stopFn := globalBatchJobsMetrics.trace(batchReplicationMetricObject, job.ID, attempts, result)
|
||||
success := true
|
||||
if err := r.ReplicateToTarget(ctx, api, c, result, retry); err != nil {
|
||||
if isErrVersionNotFound(err) || isErrObjectNotFound(err) {
|
||||
// object must be deleted concurrently, allow
|
||||
// these failures but do not count them
|
||||
return
|
||||
}
|
||||
stopFn(err)
|
||||
logger.LogIf(ctx, err)
|
||||
success = false
|
||||
} else {
|
||||
stopFn(nil)
|
||||
}
|
||||
stopFn(err)
|
||||
logger.LogIf(ctx, err)
|
||||
success = false
|
||||
} else {
|
||||
stopFn(nil)
|
||||
}
|
||||
ri.trackCurrentBucketObject(r.Source.Bucket, result, success)
|
||||
globalBatchJobsMetrics.save(job.ID, ri.clone())
|
||||
// persist in-memory state to disk after every 10secs.
|
||||
logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job.Location))
|
||||
ri.trackCurrentBucketObject(r.Source.Bucket, result, success)
|
||||
globalBatchJobsMetrics.save(job.ID, ri)
|
||||
// persist in-memory state to disk after every 10secs.
|
||||
logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job.Location))
|
||||
}()
|
||||
}
|
||||
wk.Wait()
|
||||
|
||||
ri.RetryAttempts = attempts
|
||||
ri.Complete = ri.ObjectsFailed == 0
|
||||
ri.Failed = ri.ObjectsFailed > 0
|
||||
|
||||
globalBatchJobsMetrics.save(job.ID, ri.clone())
|
||||
globalBatchJobsMetrics.save(job.ID, ri)
|
||||
// persist in-memory state to disk.
|
||||
logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job.Location))
|
||||
|
||||
buf, _ := json.Marshal(ri)
|
||||
if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil {
|
||||
@@ -1135,11 +1172,11 @@ func (j *BatchJobPool) queueJob(req *BatchJobRequest) error {
|
||||
//msgp:ignore batchJobMetrics
|
||||
type batchJobMetrics struct {
|
||||
sync.RWMutex
|
||||
metrics map[string]batchJobInfo
|
||||
metrics map[string]*batchJobInfo
|
||||
}
|
||||
|
||||
var globalBatchJobsMetrics = batchJobMetrics{
|
||||
metrics: make(map[string]batchJobInfo),
|
||||
metrics: make(map[string]*batchJobInfo),
|
||||
}
|
||||
|
||||
//msgp:ignore batchReplicationMetric
|
||||
@@ -1207,11 +1244,11 @@ func (m *batchJobMetrics) delete(jobID string) {
|
||||
delete(m.metrics, jobID)
|
||||
}
|
||||
|
||||
func (m *batchJobMetrics) save(jobID string, ri batchJobInfo) {
|
||||
func (m *batchJobMetrics) save(jobID string, ri *batchJobInfo) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
m.metrics[jobID] = ri
|
||||
m.metrics[jobID] = ri.clone()
|
||||
}
|
||||
|
||||
func (m *batchJobMetrics) trace(d batchReplicationMetric, job string, attempts int, info ObjectInfo) func(err error) {
|
||||
|
||||
@@ -898,8 +898,9 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
||||
defer wg.Done()
|
||||
err := set.listObjectsToDecommission(ctx, bi,
|
||||
func(entry metaCacheEntry) {
|
||||
parallelWorkers <- struct{}{}
|
||||
// Wait must be synchronized here.
|
||||
wg.Add(1)
|
||||
parallelWorkers <- struct{}{}
|
||||
go decommissionEntry(entry)
|
||||
},
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user