diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go
index 50e17a470..b13aa3437 100644
--- a/cmd/batch-handlers.go
+++ b/cmd/batch-handlers.go
@@ -28,6 +28,8 @@ import (
"math/rand"
"net/http"
"net/url"
+ "runtime"
+ "strconv"
"strings"
"sync"
"time"
@@ -41,7 +43,9 @@ import (
"github.com/minio/minio/internal/auth"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
+ "github.com/minio/minio/internal/workers"
"github.com/minio/pkg/console"
+ "github.com/minio/pkg/env"
iampolicy "github.com/minio/pkg/iam/policy"
"github.com/minio/pkg/wildcard"
"gopkg.in/yaml.v2"
@@ -356,6 +360,8 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL
// batchJobInfo current batch replication information
type batchJobInfo struct {
+ mu sync.RWMutex `json:"-" msg:"-"`
+
Version int `json:"-" msg:"v"`
JobID string `json:"jobID" msg:"jid"`
JobType string `json:"jobType" msg:"jt"`
@@ -425,6 +431,9 @@ func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobR
return fmt.Errorf("batchRepl: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
}
+ ri.mu.Lock()
+ defer ri.mu.Unlock()
+
// OK, parse data.
if _, err = ri.UnmarshalMsg(data[4:]); err != nil {
return err
@@ -439,8 +448,11 @@ func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobR
return nil
}
-func (ri batchJobInfo) clone() batchJobInfo {
- return batchJobInfo{
+func (ri *batchJobInfo) clone() *batchJobInfo {
+ ri.mu.RLock()
+ defer ri.mu.RUnlock()
+
+ return &batchJobInfo{
Version: ri.Version,
JobID: ri.JobID,
JobType: ri.JobType,
@@ -458,21 +470,6 @@ func (ri batchJobInfo) clone() batchJobInfo {
}
}
-func (ri batchJobInfo) save(ctx context.Context, api ObjectLayer, jobLocation string) error {
- data := make([]byte, 4, ri.Msgsize()+4)
-
- // Initialize the header.
- binary.LittleEndian.PutUint16(data[0:2], batchReplFormat)
- binary.LittleEndian.PutUint16(data[2:4], batchReplVersion)
-
- buf, err := ri.MarshalMsg(data)
- if err != nil {
- return err
- }
-
- return saveConfig(ctx, api, pathJoin(jobLocation, batchReplName), buf)
-}
-
func (ri *batchJobInfo) countItem(size int64, dmarker, success bool) {
if ri == nil {
return
@@ -499,14 +496,28 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati
return errInvalidArgument
}
now := UTCNow()
+ ri.mu.Lock()
if now.Sub(ri.LastUpdate) >= duration {
if serverDebugLog {
console.Debugf("batchReplicate: persisting batchReplication info on drive: threshold:%s, batchRepl:%#v\n", now.Sub(ri.LastUpdate), ri)
}
ri.LastUpdate = now
ri.Version = batchReplVersionV1
- return ri.save(ctx, api, jobLocation)
+
+ data := make([]byte, 4, ri.Msgsize()+4)
+
+ // Initialize the header.
+ binary.LittleEndian.PutUint16(data[0:2], batchReplFormat)
+ binary.LittleEndian.PutUint16(data[2:4], batchReplVersion)
+
+ buf, err := ri.MarshalMsg(data)
+ ri.mu.Unlock()
+ if err != nil {
+ return err
+ }
+ return saveConfig(ctx, api, pathJoin(jobLocation, batchReplName), buf)
}
+ ri.mu.Unlock()
return nil
}
@@ -514,6 +525,10 @@ func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo,
if ri == nil {
return
}
+
+ ri.mu.Lock()
+ defer ri.mu.Unlock()
+
ri.Bucket = bucket
ri.Object = info.Name
ri.countItem(info.Size, info.DeleteMarker, failed)
@@ -529,7 +544,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
if err := ri.load(ctx, api, job); err != nil {
return err
}
- globalBatchJobsMetrics.save(job.ID, ri.clone())
+ globalBatchJobsMetrics.save(job.ID, ri)
lastObject := ri.Object
delay := job.Replicate.Flags.Retry.Delay
@@ -619,9 +634,22 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
return err
}
+ workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2)))
+ if err != nil {
+ return err
+ }
+
retryAttempts := ri.RetryAttempts
retry := false
for attempts := 1; attempts <= retryAttempts; attempts++ {
+ attempts := attempts
+
+ wk, err := workers.New(workerSize)
+ if err != nil {
+ // invalid worker size.
+ return err
+ }
+
ctx, cancel := context.WithCancel(ctx)
results := make(chan ObjectInfo, 100)
@@ -635,31 +663,40 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
}
for result := range results {
- stopFn := globalBatchJobsMetrics.trace(batchReplicationMetricObject, job.ID, attempts, result)
- success := true
- if err := r.ReplicateToTarget(ctx, api, c, result, retry); err != nil {
- if isErrVersionNotFound(err) || isErrObjectNotFound(err) {
- // object must be deleted concurrently, allow
- // these failures but do not count them
- continue
+ result := result
+ wk.Take()
+ go func() {
+ defer wk.Give()
+
+ stopFn := globalBatchJobsMetrics.trace(batchReplicationMetricObject, job.ID, attempts, result)
+ success := true
+ if err := r.ReplicateToTarget(ctx, api, c, result, retry); err != nil {
+ if isErrVersionNotFound(err) || isErrObjectNotFound(err) {
+ // object must be deleted concurrently, allow
+ // these failures but do not count them
+ return
+ }
+ stopFn(err)
+ logger.LogIf(ctx, err)
+ success = false
+ } else {
+ stopFn(nil)
}
- stopFn(err)
- logger.LogIf(ctx, err)
- success = false
- } else {
- stopFn(nil)
- }
- ri.trackCurrentBucketObject(r.Source.Bucket, result, success)
- globalBatchJobsMetrics.save(job.ID, ri.clone())
- // persist in-memory state to disk after every 10secs.
- logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job.Location))
+ ri.trackCurrentBucketObject(r.Source.Bucket, result, success)
+ globalBatchJobsMetrics.save(job.ID, ri)
+ // persist in-memory state to disk after every 10secs.
+ logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job.Location))
+ }()
}
+ wk.Wait()
ri.RetryAttempts = attempts
ri.Complete = ri.ObjectsFailed == 0
ri.Failed = ri.ObjectsFailed > 0
- globalBatchJobsMetrics.save(job.ID, ri.clone())
+ globalBatchJobsMetrics.save(job.ID, ri)
+ // persist in-memory state to disk.
+ logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job.Location))
buf, _ := json.Marshal(ri)
if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil {
@@ -1135,11 +1172,11 @@ func (j *BatchJobPool) queueJob(req *BatchJobRequest) error {
//msgp:ignore batchJobMetrics
type batchJobMetrics struct {
sync.RWMutex
- metrics map[string]batchJobInfo
+ metrics map[string]*batchJobInfo
}
var globalBatchJobsMetrics = batchJobMetrics{
- metrics: make(map[string]batchJobInfo),
+ metrics: make(map[string]*batchJobInfo),
}
//msgp:ignore batchReplicationMetric
@@ -1207,11 +1244,11 @@ func (m *batchJobMetrics) delete(jobID string) {
delete(m.metrics, jobID)
}
-func (m *batchJobMetrics) save(jobID string, ri batchJobInfo) {
+func (m *batchJobMetrics) save(jobID string, ri *batchJobInfo) {
m.Lock()
defer m.Unlock()
- m.metrics[jobID] = ri
+ m.metrics[jobID] = ri.clone()
}
func (m *batchJobMetrics) trace(d batchReplicationMetric, job string, attempts int, info ObjectInfo) func(err error) {
diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go
index 21fe75268..39c63a94d 100644
--- a/cmd/erasure-server-pool-decom.go
+++ b/cmd/erasure-server-pool-decom.go
@@ -898,8 +898,9 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
defer wg.Done()
err := set.listObjectsToDecommission(ctx, bi,
func(entry metaCacheEntry) {
- parallelWorkers <- struct{}{}
+ // Wait must be synchronized here.
wg.Add(1)
+ parallelWorkers <- struct{}{}
go decommissionEntry(entry)
},
)
diff --git a/internal/jobtokens/jobtokens.go b/internal/workers/workers.go
similarity index 68%
rename from internal/jobtokens/jobtokens.go
rename to internal/workers/workers.go
index 6e7033ea4..4ec42b35b 100644
--- a/internal/jobtokens/jobtokens.go
+++ b/internal/workers/workers.go
@@ -1,4 +1,4 @@
-// Copyright (c) 2022 MinIO, Inc.
+// Copyright (c) 2022-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@@ -15,49 +15,49 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package jobtokens
+package workers
import (
"errors"
"sync"
)
-// JobTokens provides a bounded semaphore with the ability to wait until all
+// Workers provides a bounded semaphore with the ability to wait until all
// concurrent jobs finish.
-type JobTokens struct {
- wg sync.WaitGroup
- tokens chan struct{}
+type Workers struct {
+ wg sync.WaitGroup
+ queue chan struct{}
}
-// New creates a JobTokens object which allows up to n jobs to proceed
+// New creates a Workers object which allows up to n jobs to proceed
// concurrently. n must be > 0.
-func New(n int) (*JobTokens, error) {
+func New(n int) (*Workers, error) {
if n <= 0 {
return nil, errors.New("n must be > 0")
}
- tokens := make(chan struct{}, n)
+ queue := make(chan struct{}, n)
for i := 0; i < n; i++ {
- tokens <- struct{}{}
+ queue <- struct{}{}
}
- return &JobTokens{
- tokens: tokens,
+ return &Workers{
+ queue: queue,
}, nil
}
// Take is how a job (goroutine) can Take its turn.
-func (jt *JobTokens) Take() {
+func (jt *Workers) Take() {
jt.wg.Add(1)
- <-jt.tokens
+ <-jt.queue
}
// Give is how a job (goroutine) can give back its turn once done.
-func (jt *JobTokens) Give() {
+func (jt *Workers) Give() {
+ jt.queue <- struct{}{}
jt.wg.Done()
- jt.tokens <- struct{}{}
}
// Wait waits for all ongoing concurrent jobs to complete
-func (jt *JobTokens) Wait() {
+func (jt *Workers) Wait() {
jt.wg.Wait()
}
diff --git a/internal/jobtokens/jobtokens_test.go b/internal/workers/workers_test.go
similarity index 85%
rename from internal/jobtokens/jobtokens_test.go
rename to internal/workers/workers_test.go
index a203ac876..29b15f4f0 100644
--- a/internal/jobtokens/jobtokens_test.go
+++ b/internal/workers/workers_test.go
@@ -1,4 +1,4 @@
-// Copyright (c) 2022 MinIO, Inc.
+// Copyright (c) 2022-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@@ -15,7 +15,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package jobtokens
+package workers
import (
"fmt"
@@ -23,7 +23,7 @@ import (
"testing"
)
-func TestJobTokens(t *testing.T) {
+func TestWorkers(t *testing.T) {
tests := []struct {
n int
jobs int
@@ -59,7 +59,7 @@ func TestJobTokens(t *testing.T) {
testFn := func(n, jobs int, mustFail bool) {
var mu sync.Mutex
var jobsDone int
- // Create jobTokens for n concurrent workers
+ // Create workers for n concurrent workers
jt, err := New(n)
if err == nil && mustFail {
t.Fatal("Expected test to return error")
@@ -92,8 +92,8 @@ func TestJobTokens(t *testing.T) {
})
}
- // Verify that jobTokens can be reused after full drain
- t.Run("test-jobTokens-reuse", func(t *testing.T) {
+ // Verify that workers can be reused after full drain
+ t.Run("test-workers-reuse", func(t *testing.T) {
var mu sync.Mutex
jt, _ := New(5)
for reuse := 0; reuse < 3; reuse++ {
@@ -115,7 +115,7 @@ func TestJobTokens(t *testing.T) {
})
}
-func benchmarkJobTokens(b *testing.B, n, jobs int) {
+func benchmarkWorkers(b *testing.B, n, jobs int) {
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
@@ -139,10 +139,10 @@ func benchmarkJobTokens(b *testing.B, n, jobs int) {
})
}
-func BenchmarkJobTokens_N5_J10(b *testing.B) {
- benchmarkJobTokens(b, 5, 10)
+func BenchmarkWorkers_N5_J10(b *testing.B) {
+ benchmarkWorkers(b, 5, 10)
}
-func BenchmarkJobTokens_N5_J100(b *testing.B) {
- benchmarkJobTokens(b, 5, 100)
+func BenchmarkWorkers_N5_J100(b *testing.B) {
+ benchmarkWorkers(b, 5, 100)
}