diff --git a/cmd/global-heal.go b/cmd/global-heal.go
index 95878e9c0..e766e2557 100644
--- a/cmd/global-heal.go
+++ b/cmd/global-heal.go
@@ -1,4 +1,4 @@
-// Copyright (c) 2015-2021 MinIO, Inc.
+// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@@ -21,14 +21,17 @@ import (
"context"
"fmt"
"sort"
+ "strconv"
"time"
"github.com/dustin/go-humanize"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/config/storageclass"
+ "github.com/minio/minio/internal/jobtokens"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/console"
+ "github.com/minio/pkg/env"
"github.com/minio/pkg/wildcard"
)
@@ -161,6 +164,8 @@ func mustGetHealSequence(ctx context.Context) *healSequence {
}
}
+const envHealWorkers = "_MINIO_HEAL_WORKERS"
+
// healErasureSet lists and heals all objects in a specific erasure set
func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, tracker *healingTracker) error {
bgSeq := mustGetHealSequence(ctx)
@@ -181,6 +186,16 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
}
}
+ // numHealers - number of concurrent heal jobs, defaults to 1
+ numHealers, err := strconv.Atoi(env.Get(envHealWorkers, "1"))
+ if err != nil {
+ logger.LogIf(ctx, fmt.Errorf("invalid %s value %v, defaulting to 1", envHealWorkers, err))
+ }
+ if numHealers < 1 {
+ numHealers = 1
+ }
+ // jt will never be nil since we ensure that numHealers > 0
+ jt, _ := jobtokens.New(numHealers)
var retErr error
// Heal all buckets with all objects
for _, bucket := range healBuckets {
@@ -229,6 +244,8 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
}
healEntry := func(entry metaCacheEntry) {
+ defer jt.Give()
+
if entry.name == "" && len(entry.metadata) == 0 {
// ignore entries that don't have metadata.
return
@@ -308,14 +325,17 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
bucket: bucket,
}
- err := listPathRaw(ctx, listPathRawOptions{
+ err = listPathRaw(ctx, listPathRawOptions{
disks: disks,
bucket: bucket,
recursive: true,
forwardTo: forwardTo,
minDisks: 1,
reportNotFound: false,
- agreed: healEntry,
+ agreed: func(entry metaCacheEntry) {
+ jt.Take()
+ go healEntry(entry)
+ },
partial: func(entries metaCacheEntries, _ []error) {
entry, ok := entries.resolve(&resolver)
if !ok {
@@ -323,10 +343,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
// proceed to heal nonetheless.
entry, _ = entries.firstFound()
}
- healEntry(*entry)
+ jt.Take()
+ go healEntry(*entry)
},
finished: nil,
})
+ jt.Wait() // synchronize all the concurrent heal jobs
if err != nil {
// Set this such that when we return this function
// we let the caller retry this disk again for the
diff --git a/internal/jobtokens/jobtokens.go b/internal/jobtokens/jobtokens.go
new file mode 100644
index 000000000..6e7033ea4
--- /dev/null
+++ b/internal/jobtokens/jobtokens.go
@@ -0,0 +1,63 @@
+// Copyright (c) 2022 MinIO, Inc.
+//
+// This file is part of MinIO Object Storage stack
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package jobtokens
+
+import (
+ "errors"
+ "sync"
+)
+
+// JobTokens provides a bounded semaphore with the ability to wait until all
+// concurrent jobs finish.
+type JobTokens struct {
+ wg sync.WaitGroup
+ tokens chan struct{}
+}
+
+// New creates a JobTokens object which allows up to n jobs to proceed
+// concurrently. n must be > 0.
+func New(n int) (*JobTokens, error) {
+ if n <= 0 {
+ return nil, errors.New("n must be > 0")
+ }
+
+ tokens := make(chan struct{}, n)
+ for i := 0; i < n; i++ {
+ tokens <- struct{}{}
+ }
+ return &JobTokens{
+ tokens: tokens,
+ }, nil
+}
+
+// Take is how a job (goroutine) can Take its turn.
+func (jt *JobTokens) Take() {
+ jt.wg.Add(1)
+ <-jt.tokens
+}
+
+// Give is how a job (goroutine) can give back its turn once done.
+func (jt *JobTokens) Give() {
+ jt.wg.Done()
+ jt.tokens <- struct{}{}
+}
+
+// Wait waits for all ongoing concurrent jobs to complete
+func (jt *JobTokens) Wait() {
+ jt.wg.Wait()
+}
diff --git a/internal/jobtokens/jobtokens_test.go b/internal/jobtokens/jobtokens_test.go
new file mode 100644
index 000000000..a203ac876
--- /dev/null
+++ b/internal/jobtokens/jobtokens_test.go
@@ -0,0 +1,148 @@
+// Copyright (c) 2022 MinIO, Inc.
+//
+// This file is part of MinIO Object Storage stack
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package jobtokens
+
+import (
+ "fmt"
+ "sync"
+ "testing"
+)
+
+func TestJobTokens(t *testing.T) {
+ tests := []struct {
+ n int
+ jobs int
+ mustFail bool
+ }{
+ {
+ n: 0,
+ jobs: 5,
+ mustFail: true,
+ },
+ {
+ n: -1,
+ jobs: 5,
+ mustFail: true,
+ },
+ {
+ n: 1,
+ jobs: 5,
+ },
+ {
+ n: 2,
+ jobs: 5,
+ },
+ {
+ n: 5,
+ jobs: 10,
+ },
+ {
+ n: 10,
+ jobs: 5,
+ },
+ }
+ testFn := func(n, jobs int, mustFail bool) {
+ var mu sync.Mutex
+ var jobsDone int
+ // Create jobTokens for n concurrent workers
+ jt, err := New(n)
+ if err == nil && mustFail {
+ t.Fatal("Expected test to return error")
+ }
+ if err != nil && mustFail {
+ return
+ }
+ if err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
+
+ for i := 0; i < jobs; i++ {
+ jt.Take()
+ go func() { // Launch a worker after acquiring a token
+ defer jt.Give() // Give token back once done
+ mu.Lock()
+ jobsDone++
+ mu.Unlock()
+ }()
+ }
+ jt.Wait() // Wait for all workers to complete
+ if jobsDone != jobs {
+ t.Fatalf("Expected %d jobs to be done but only %d were done", jobs, jobsDone)
+ }
+ }
+
+ for i, test := range tests {
+ t.Run(fmt.Sprintf("test-%d", i), func(t *testing.T) {
+ testFn(test.n, test.jobs, test.mustFail)
+ })
+ }
+
+ // Verify that jobTokens can be reused after full drain
+ t.Run("test-jobTokens-reuse", func(t *testing.T) {
+ var mu sync.Mutex
+ jt, _ := New(5)
+ for reuse := 0; reuse < 3; reuse++ {
+ var jobsDone int
+ for i := 0; i < 10; i++ {
+ jt.Take()
+ go func() {
+ defer jt.Give()
+ mu.Lock()
+ jobsDone++
+ mu.Unlock()
+ }()
+ }
+ jt.Wait()
+ if jobsDone != 10 {
+ t.Fatalf("Expected %d jobs to be complete but only %d were", 10, jobsDone)
+ }
+ }
+ })
+}
+
+func benchmarkJobTokens(b *testing.B, n, jobs int) {
+ b.ReportAllocs()
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ var mu sync.Mutex
+ var jobsDone int
+ jt, _ := New(n)
+ for i := 0; i < jobs; i++ {
+ jt.Take()
+ go func() {
+ defer jt.Give()
+ mu.Lock()
+ jobsDone++
+ mu.Unlock()
+ }()
+ }
+ jt.Wait()
+ if jobsDone != jobs {
+ b.Fail()
+ }
+ }
+ })
+}
+
+func BenchmarkJobTokens_N5_J10(b *testing.B) {
+ benchmarkJobTokens(b, 5, 10)
+}
+
+func BenchmarkJobTokens_N5_J100(b *testing.B) {
+ benchmarkJobTokens(b, 5, 100)
+}