diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index a0bde1c5b..31761fb7d 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -2657,7 +2657,7 @@ func fetchHealthInfo(healthCtx context.Context, objectAPI ObjectLayer, query *ur } go func() { - defer close(healthInfoCh) + defer xioutil.SafeClose(healthInfoCh) partialWrite(healthInfo) // Write first message with only version and deployment id populated getAndWritePlatformInfo() @@ -3046,7 +3046,7 @@ func getClusterMetaInfo(ctx context.Context) []byte { resultCh := make(chan madmin.ClusterRegistrationInfo) go func() { - defer close(resultCh) + defer xioutil.SafeClose(resultCh) ci := madmin.ClusterRegistrationInfo{} ci.Info.NoOfServerPools = len(globalEndpoints) diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index e607a728b..ffbc9c0c5 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -28,6 +28,7 @@ import ( "time" "github.com/minio/madmin-go/v3" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" ) @@ -800,7 +801,7 @@ func (h *healSequence) healItems(objAPI ObjectLayer, bucketsOnly bool) error { func (h *healSequence) traverseAndHeal(objAPI ObjectLayer) { bucketsOnly := false // Heals buckets and objects also. h.traverseAndHealDoneCh <- h.healItems(objAPI, bucketsOnly) - close(h.traverseAndHealDoneCh) + xioutil.SafeClose(h.traverseAndHealDoneCh) } // healMinioSysMeta - heals all files under a given meta prefix, returns a function diff --git a/cmd/batch-expire.go b/cmd/batch-expire.go index a6b84c8e0..72bc48330 100644 --- a/cmd/batch-expire.go +++ b/cmd/batch-expire.go @@ -32,6 +32,7 @@ import ( "github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio/internal/bucket/versioning" xhttp "github.com/minio/minio/internal/http" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/env" "github.com/minio/pkg/v2/wildcard" @@ -648,7 +649,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo case expireCh <- toDel: } } - close(expireCh) + xioutil.SafeClose(expireCh) wk.Wait() // waits for all expire goroutines to complete @@ -658,7 +659,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo // Close the saverQuitCh - this also triggers saving in-memory state // immediately one last time before we exit this method. - close(saverQuitCh) + xioutil.SafeClose(saverQuitCh) // Notify expire jobs final status to the configured endpoint buf, _ := json.Marshal(ri) diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 20730ed5f..bed11fd5c 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -47,6 +47,7 @@ import ( "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/ioutil" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/console" "github.com/minio/pkg/v2/env" @@ -545,7 +546,7 @@ func (r BatchJobReplicateV1) writeAsArchive(ctx context.Context, objAPI ObjectLa } go func() { - defer close(input) + defer xioutil.SafeClose(input) for _, entry := range entries { gr, err := objAPI.GetObjectNInfo(ctx, r.Source.Bucket, @@ -1038,7 +1039,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() { go func() { - defer close(slowCh) + defer xioutil.SafeClose(slowCh) // Snowball currently needs the high level minio-go Client, not the Core one cl, err := miniogo.New(u.Host, &miniogo.Options{ @@ -1809,7 +1810,7 @@ func (j *BatchJobPool) queueJob(req *BatchJobRequest) error { select { case <-j.ctx.Done(): j.once.Do(func() { - close(j.jobCh) + xioutil.SafeClose(j.jobCh) }) case j.jobCh <- req: default: diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index da94d5c96..22450bb7f 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -39,6 +39,7 @@ import ( "github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/event" xhttp "github.com/minio/minio/internal/http" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/s3select" "github.com/minio/pkg/v2/env" @@ -118,8 +119,8 @@ func (es *expiryState) PendingTasks() int { // close closes work channels exactly once. func (es *expiryState) close() { es.once.Do(func() { - close(es.byDaysCh) - close(es.byNewerNoncurrentCh) + xioutil.SafeClose(es.byDaysCh) + xioutil.SafeClose(es.byNewerNoncurrentCh) }) } diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 63eb2dc76..ab6096538 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -47,6 +47,7 @@ import ( "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/tinylib/msgp/msgp" "github.com/zeebo/xxh3" @@ -1894,7 +1895,7 @@ func (p *ReplicationPool) AddLargeWorkers() { go func() { <-p.ctx.Done() for i := 0; i < LargeWorkerCount; i++ { - close(p.lrgworkers[i]) + xioutil.SafeClose(p.lrgworkers[i]) } }() } @@ -1953,7 +1954,7 @@ func (p *ReplicationPool) ResizeWorkers(n, checkOld int) { for len(p.workers) > n { worker := p.workers[len(p.workers)-1] p.workers = p.workers[:len(p.workers)-1] - close(worker) + xioutil.SafeClose(worker) } } @@ -2755,7 +2756,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object } workers := make([]chan ReplicateObjectInfo, resyncParallelRoutines) resultCh := make(chan TargetReplicationResyncStatus, 1) - defer close(resultCh) + defer xioutil.SafeClose(resultCh) go func() { for r := range resultCh { s.incStats(r, opts) @@ -2867,7 +2868,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object } } for i := 0; i < resyncParallelRoutines; i++ { - close(workers[i]) + xioutil.SafeClose(workers[i]) } wg.Wait() resyncStatus = ResyncCompleted @@ -3123,7 +3124,7 @@ func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, } diffCh := make(chan madmin.DiffInfo, 4000) go func() { - defer close(diffCh) + defer xioutil.SafeClose(diffCh) for obj := range objInfoCh { if contextCanceled(ctx) { // Just consume input... @@ -3316,7 +3317,7 @@ func (p *ReplicationPool) persistMRF() { mTimer.Reset(mrfSaveInterval) case <-p.ctx.Done(): p.mrfStopCh <- struct{}{} - close(p.mrfSaveCh) + xioutil.SafeClose(p.mrfSaveCh) // We try to save if possible, but we don't care beyond that. saveMRFToDisk() return @@ -3551,7 +3552,7 @@ func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch <-chan mrfCh := make(chan madmin.ReplicationMRF, 100) go func() { - defer close(mrfCh) + defer xioutil.SafeClose(mrfCh) for vID, e := range mrfRec.Entries { if bucket != "" && e.Bucket != bucket { continue diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index bd1fc5e30..6a4934f4b 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -40,6 +40,7 @@ import ( "github.com/minio/minio/internal/color" "github.com/minio/minio/internal/config/heal" "github.com/minio/minio/internal/event" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/console" uatomic "go.uber.org/atomic" @@ -1479,7 +1480,7 @@ func (d *dynamicSleeper) Update(factor float64, maxWait time.Duration) error { return nil } // Update values and cycle waiting. - close(d.cycle) + xioutil.SafeClose(d.cycle) d.factor = factor d.maxSleep = maxWait d.cycle = make(chan struct{}) diff --git a/cmd/erasure-decode.go b/cmd/erasure-decode.go index a2bd568c6..3a04c5664 100644 --- a/cmd/erasure-decode.go +++ b/cmd/erasure-decode.go @@ -25,6 +25,7 @@ import ( "sync" "sync/atomic" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" ) @@ -118,7 +119,7 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) { } readTriggerCh := make(chan bool, len(p.readers)) - defer close(readTriggerCh) // close the channel upon return + defer xioutil.SafeClose(readTriggerCh) // close the channel upon return for i := 0; i < p.dataBlocks; i++ { // Setup read triggers for p.dataBlocks number of reads so that it reads in parallel. diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 0a365de13..5cb8f52a3 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -760,7 +760,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s } mrfCheck := make(chan FileInfo) - defer close(mrfCheck) + defer xioutil.SafeClose(mrfCheck) var rw sync.Mutex @@ -810,7 +810,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s } wg.Wait() - close(done) + xioutil.SafeClose(done) fi, ok := <-mrfCheck if !ok { diff --git a/cmd/erasure-server-pool-rebalance.go b/cmd/erasure-server-pool-rebalance.go index 3cf32a329..cfcb30148 100644 --- a/cmd/erasure-server-pool-rebalance.go +++ b/cmd/erasure-server-pool-rebalance.go @@ -34,6 +34,7 @@ import ( "github.com/lithammer/shortuuid/v4" "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/hash" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/env" ) @@ -372,7 +373,7 @@ func (z *erasureServerPools) IsPoolRebalancing(poolIndex int) bool { func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) (err error) { doneCh := make(chan struct{}) - defer close(doneCh) + defer xioutil.SafeClose(doneCh) // Save rebalance.bin periodically. go func() { diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 690a448bb..60fb74e96 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -40,6 +40,7 @@ import ( "github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio/internal/bpool" "github.com/minio/minio/internal/config/storageclass" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/sync/errgroup" "github.com/minio/pkg/v2/wildcard" @@ -653,7 +654,7 @@ func (z *erasureServerPools) StorageInfo(ctx context.Context, metrics bool) Stor func (z *erasureServerPools) NSScanner(ctx context.Context, updates chan<- DataUsageInfo, wantCycle uint32, healScanMode madmin.HealScanMode) error { // Updates must be closed before we return. - defer close(updates) + defer xioutil.SafeClose(updates) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -680,7 +681,7 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, updates chan<- DataU results = append(results, dataUsageCache{}) go func(i int, erObj *erasureObjects) { updates := make(chan dataUsageCache, 1) - defer close(updates) + defer xioutil.SafeClose(updates) // Start update collector. go func() { defer wg.Done() @@ -739,7 +740,7 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, updates chan<- DataU return case v := <-updateCloser: update() - close(v) + xioutil.SafeClose(v) return case <-updateTicker.C: update() @@ -1957,7 +1958,7 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts WalkOptions) error { if err := checkListObjsArgs(ctx, bucket, prefix, "", z); err != nil { // Upon error close the channel. - close(results) + xioutil.SafeClose(results) return err } @@ -1966,7 +1967,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re ctx, cancel := context.WithCancel(ctx) go func() { defer cancel() - defer close(results) + defer xioutil.SafeClose(results) for _, erasureSet := range z.serverPools { var wg sync.WaitGroup diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 05f52bf6f..f1374c312 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -37,6 +37,7 @@ import ( "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio/internal/dsync" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/console" "github.com/minio/pkg/v2/sync/errgroup" @@ -667,10 +668,10 @@ func (s *erasureSets) Shutdown(ctx context.Context) error { select { case _, ok := <-s.setReconnectEvent: if ok { - close(s.setReconnectEvent) + xioutil.SafeClose(s.setReconnectEvent) } default: - close(s.setReconnectEvent) + xioutil.SafeClose(s.setReconnectEvent) } return nil } diff --git a/cmd/erasure.go b/cmd/erasure.go index 887a9f9d6..dfab4593e 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -423,7 +423,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa bucketCh <- b } } - close(bucketCh) + xioutil.SafeClose(bucketCh) bucketResults := make(chan dataUsageEntryInfo, len(disks)) @@ -560,7 +560,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa }(i) } wg.Wait() - close(bucketResults) + xioutil.SafeClose(bucketResults) saverWg.Wait() return nil diff --git a/cmd/ftp-server-driver.go b/cmd/ftp-server-driver.go index ebbd07fd6..2b55357db 100644 --- a/cmd/ftp-server-driver.go +++ b/cmd/ftp-server-driver.go @@ -32,6 +32,7 @@ import ( "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio/internal/auth" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" ftp "goftp.io/server/v2" ) @@ -386,7 +387,7 @@ func (driver *ftpDriver) DeleteDir(ctx *ftp.Context, path string) (err error) { // Send object names that are needed to be removed to objectsCh go func() { - defer close(objectsCh) + defer xioutil.SafeClose(objectsCh) opts := minio.ListObjectsOptions{ Prefix: prefix, Recursive: true, diff --git a/cmd/global-heal.go b/cmd/global-heal.go index fd566212c..73423b12b 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -28,6 +28,7 @@ import ( "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/color" "github.com/minio/minio/internal/config/storageclass" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/console" "github.com/minio/pkg/v2/wildcard" @@ -443,7 +444,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, finished: nil, }) jt.Wait() // synchronize all the concurrent heal jobs - close(results) + xioutil.SafeClose(results) if err != nil { // Set this such that when we return this function // we let the caller retry this disk again for the diff --git a/cmd/iam-object-store.go b/cmd/iam-object-store.go index 51263e130..b2c6b2ca2 100644 --- a/cmd/iam-object-store.go +++ b/cmd/iam-object-store.go @@ -31,6 +31,7 @@ import ( "github.com/minio/madmin-go/v3" "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio/internal/config" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/logger" ) @@ -575,7 +576,7 @@ func listIAMConfigItems(ctx context.Context, objAPI ObjectLayer, pathPrefix stri ch := make(chan itemOrErr) go func() { - defer close(ch) + defer xioutil.SafeClose(ch) // Allocate new results channel to receive ObjectInfo. objInfoCh := make(chan ObjectInfo) diff --git a/cmd/iam.go b/cmd/iam.go index cd916895c..15782fc98 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -45,6 +45,7 @@ import ( "github.com/minio/minio/internal/config/policy/opa" polplugin "github.com/minio/minio/internal/config/policy/plugin" xhttp "github.com/minio/minio/internal/http" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/jwt" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/policy" @@ -207,7 +208,7 @@ func (sys *IAMSys) Load(ctx context.Context, firstTime bool) error { select { case <-sys.configLoaded: default: - close(sys.configLoaded) + xioutil.SafeClose(sys.configLoaded) } return nil } diff --git a/cmd/metacache-entries.go b/cmd/metacache-entries.go index 84150dfd8..a1688f998 100644 --- a/cmd/metacache-entries.go +++ b/cmd/metacache-entries.go @@ -26,6 +26,7 @@ import ( "sort" "strings" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/console" ) @@ -659,7 +660,7 @@ func (m *metaCacheEntriesSorted) forwardPast(s string) { // If the context is canceled the function will return the error, // otherwise the function will return nil. func mergeEntryChannels(ctx context.Context, in []chan metaCacheEntry, out chan<- metaCacheEntry, readQuorum int) error { - defer close(out) + defer xioutil.SafeClose(out) top := make([]*metaCacheEntry, len(in)) nDone := 0 ctxDone := ctx.Done() diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index 80af7b2a6..99476293b 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -28,6 +28,7 @@ import ( "sync" "time" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" ) @@ -339,7 +340,7 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions, // When 'in' is closed or the context is canceled the // function closes 'out' and exits. func applyBucketActions(ctx context.Context, o listPathOptions, in <-chan metaCacheEntry, out chan<- metaCacheEntry) { - defer close(out) + defer xioutil.SafeClose(out) for { var obj metaCacheEntry @@ -472,16 +473,16 @@ func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions funcReturnedMu.Unlock() outCh <- entry if returned { - close(outCh) + xioutil.SafeClose(outCh) } } entry.reusable = returned saveCh <- entry } if !returned { - close(outCh) + xioutil.SafeClose(outCh) } - close(saveCh) + xioutil.SafeClose(saveCh) }() return filteredResults() diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index e6ae83b15..a77e9c011 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -37,6 +37,7 @@ import ( "github.com/minio/minio/internal/bucket/versioning" "github.com/minio/minio/internal/color" "github.com/minio/minio/internal/hash" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/console" ) @@ -679,7 +680,7 @@ func getQuorumDisks(disks []StorageAPI, infos []DiskInfo, readQuorum int) (newDi // Will return io.EOF if continuing would not yield more results. func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, results chan<- metaCacheEntry) (err error) { - defer close(results) + defer xioutil.SafeClose(results) o.debugf(color.Green("listPath:")+" with options: %#v", o) // get prioritized non-healing disks for listing diff --git a/cmd/metacache-stream.go b/cmd/metacache-stream.go index 96c93894c..f6d08245e 100644 --- a/cmd/metacache-stream.go +++ b/cmd/metacache-stream.go @@ -27,6 +27,7 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/klauspost/compress/s2" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/tinylib/msgp/msgp" "github.com/valyala/bytebufferpool" @@ -550,7 +551,7 @@ func (r *metacacheReader) readAll(ctx context.Context, dst chan<- metaCacheEntry if r.err != nil { return r.err } - defer close(dst) + defer xioutil.SafeClose(dst) if r.current.name != "" { select { case <-ctx.Done(): diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index a30244b1d..174f05e89 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -94,7 +94,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ if err != nil { return err } - defer close(out) + defer xioutil.SafeClose(out) var objsReturned int objReturned := func(metadata []byte) { diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index 3e8c64203..c2f1aa7e8 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -32,6 +32,7 @@ import ( "github.com/minio/kes-go" "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/bucket/lifecycle" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/mcontext" "github.com/minio/minio/internal/rest" @@ -1724,7 +1725,7 @@ func getGoMetrics() *MetricsGroup { func getHistogramMetrics(hist *prometheus.HistogramVec, desc MetricDescription) []Metric { ch := make(chan prometheus.Metric) go func() { - defer close(ch) + defer xioutil.SafeClose(ch) // Collects prometheus metrics from hist and sends it over ch hist.Collect(ch) }() @@ -3881,7 +3882,7 @@ func (c *minioClusterCollector) Collect(out chan<- prometheus.Metric) { func ReportMetrics(ctx context.Context, metricsGroups []*MetricsGroup) <-chan Metric { ch := make(chan Metric) go func() { - defer close(ch) + defer xioutil.SafeClose(ch) populateAndPublish(metricsGroups, func(m Metric) bool { if m.VariableLabels == nil { m.VariableLabels = make(map[string]string) diff --git a/cmd/notification.go b/cmd/notification.go index 13c304e4e..c63bc0fb5 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -33,6 +33,7 @@ import ( "github.com/cespare/xxhash/v2" "github.com/klauspost/compress/zip" "github.com/minio/madmin-go/v3" + xioutil "github.com/minio/minio/internal/ioutil" xnet "github.com/minio/pkg/v2/net" "github.com/minio/pkg/v2/sync/errgroup" "github.com/minio/pkg/v2/workers" @@ -1263,7 +1264,7 @@ func (sys *NotificationSys) collectPeerMetrics(ctx context.Context, peerChannels } go func(wg *sync.WaitGroup, ch chan Metric) { wg.Wait() - close(ch) + xioutil.SafeClose(ch) }(&wg, ch) return ch } @@ -1488,7 +1489,7 @@ func (sys *NotificationSys) DriveSpeedTest(ctx context.Context, opts madmin.Driv go func(wg *sync.WaitGroup, ch chan madmin.DriveSpeedTestResult) { wg.Wait() - close(ch) + xioutil.SafeClose(ch) }(&wg, ch) return ch @@ -1616,7 +1617,7 @@ func (sys *NotificationSys) GetReplicationMRF(ctx context.Context, bucket, node }(mrfCh) go func(wg *sync.WaitGroup) { wg.Wait() - close(mrfCh) + xioutil.SafeClose(mrfCh) }(&wg) return mrfCh, nil } diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 413d39ebc..d7724f3bb 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -46,6 +46,7 @@ import ( "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/ioutil" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/trie" "github.com/minio/pkg/v2/wildcard" @@ -1081,7 +1082,7 @@ func newS2CompressReader(r io.Reader, on int64, encrypted bool) (rc io.ReadClose comp := s2.NewWriter(pw, opts...) indexCh := make(chan []byte, 1) go func() { - defer close(indexCh) + defer xioutil.SafeClose(indexCh) cn, err := io.Copy(comp, r) if err != nil { comp.Close() diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index fcffc5bdf..88e09022a 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -34,6 +34,7 @@ import ( "github.com/minio/minio/internal/bucket/bandwidth" "github.com/minio/minio/internal/event" xhttp "github.com/minio/minio/internal/http" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/rest" "github.com/minio/pkg/v2/logger/message/log" @@ -253,7 +254,7 @@ func (client *peerRESTClient) GetResourceMetrics(ctx context.Context) (<-chan Me go func(ch chan<- Metric) { defer func() { xhttp.DrainBody(respBody) - close(ch) + xioutil.SafeClose(ch) }() for { var metric Metric @@ -625,7 +626,7 @@ func (client *peerRESTClient) doTrace(traceCh chan<- madmin.TraceInfo, doneCh <- ctx, cancel := context.WithCancel(GlobalContext) cancelCh := make(chan struct{}) - defer close(cancelCh) + defer xioutil.SafeClose(cancelCh) go func() { select { case <-doneCh: @@ -663,7 +664,7 @@ func (client *peerRESTClient) doListen(listenCh chan<- event.Event, doneCh <-cha ctx, cancel := context.WithCancel(GlobalContext) cancelCh := make(chan struct{}) - defer close(cancelCh) + defer xioutil.SafeClose(cancelCh) go func() { select { case <-doneCh: @@ -733,7 +734,7 @@ func (client *peerRESTClient) doConsoleLog(logCh chan log.Info, doneCh <-chan st ctx, cancel := context.WithCancel(GlobalContext) cancelCh := make(chan struct{}) - defer close(cancelCh) + defer xioutil.SafeClose(cancelCh) go func() { select { case <-doneCh: @@ -860,7 +861,7 @@ func (client *peerRESTClient) GetPeerMetrics(ctx context.Context) (<-chan Metric go func(ch chan<- Metric) { defer func() { xhttp.DrainBody(respBody) - close(ch) + xioutil.SafeClose(ch) }() for { var metric Metric @@ -887,7 +888,7 @@ func (client *peerRESTClient) GetPeerBucketMetrics(ctx context.Context) (<-chan go func(ch chan<- Metric) { defer func() { xhttp.DrainBody(respBody) - close(ch) + xioutil.SafeClose(ch) }() for { var metric Metric @@ -1025,7 +1026,7 @@ func (client *peerRESTClient) GetReplicationMRF(ctx context.Context, bucket stri go func(ch chan madmin.ReplicationMRF) { defer func() { xhttp.DrainBody(respBody) - close(ch) + xioutil.SafeClose(ch) }() for { var entry madmin.ReplicationMRF diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 92ea4f50c..efbdad203 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -1237,7 +1237,7 @@ func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Reques } doneCh := make(chan struct{}) - defer close(doneCh) + defer xioutil.SafeClose(doneCh) ch := make(chan log.Info, 100000) err := globalConsoleSys.Subscribe(ch, doneCh, "", 0, madmin.LogMaskAll, nil) @@ -1298,7 +1298,7 @@ func (s *peerRESTServer) GetBandwidth(w http.ResponseWriter, r *http.Request) { bucketsString := r.Form.Get("buckets") doneCh := make(chan struct{}) - defer close(doneCh) + defer xioutil.SafeClose(doneCh) selectBuckets := b.SelectBuckets(strings.Split(bucketsString, ",")...) report := globalBucketMonitor.GetReport(selectBuckets) diff --git a/cmd/perf-tests.go b/cmd/perf-tests.go index 26b335687..5a9bcf380 100644 --- a/cmd/perf-tests.go +++ b/cmd/perf-tests.go @@ -318,7 +318,7 @@ func netperf(ctx context.Context, duration time.Duration) madmin.NetperfNodeResu } time.Sleep(duration) - close(r.eof) + xioutil.SafeClose(r.eof) wg.Wait() for { if globalNetPerfRX.ActiveConnections() == 0 { @@ -376,7 +376,7 @@ func siteNetperf(ctx context.Context, duration time.Duration) madmin.SiteNetPerf } time.Sleep(duration) - close(r.eof) + xioutil.SafeClose(r.eof) wg.Wait() for { if globalSiteNetPerfRX.ActiveConnections() == 0 || contextCanceled(ctx) { diff --git a/cmd/server-main.go b/cmd/server-main.go index c2b92536e..3d84e4361 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -47,6 +47,7 @@ import ( "github.com/minio/minio/internal/handlers" "github.com/minio/minio/internal/hash/sha256" xhttp "github.com/minio/minio/internal/http" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/certs" "github.com/minio/pkg/v2/env" @@ -737,7 +738,7 @@ func serverMain(ctx *cli.Context) { logger.Fatal(config.ErrUnexpectedError(err), "Unable to configure one of server's RPC services") } // Allow grid to start after registering all services. - close(globalGridStart) + xioutil.SafeClose(globalGridStart) httpServer := xhttp.NewServer(getServerListenAddrs()). UseHandler(setCriticalErrorHandler(corsHandler(handler))). diff --git a/cmd/service.go b/cmd/service.go index a89eeefa0..ebe7394b5 100644 --- a/cmd/service.go +++ b/cmd/service.go @@ -23,6 +23,8 @@ import ( "os/exec" "runtime" "syscall" + + xioutil "github.com/minio/minio/internal/ioutil" ) // Type of service signals currently supported. @@ -109,7 +111,7 @@ func unfreezeServices() { if val := globalServiceFreeze.Swap(_ch); val != nil { if ch, ok := val.(chan struct{}); ok && ch != nil { // Close previous non-nil channel. - close(ch) + xioutil.SafeClose(ch) } } globalServiceFreezeCnt = 0 // Don't risk going negative. diff --git a/cmd/sftp-server-driver.go b/cmd/sftp-server-driver.go index 9329e35ee..a713c30e3 100644 --- a/cmd/sftp-server-driver.go +++ b/cmd/sftp-server-driver.go @@ -32,6 +32,7 @@ import ( "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio/internal/auth" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/pkg/sftp" "golang.org/x/crypto/ssh" @@ -357,7 +358,7 @@ func (f *sftpDriver) Filecmd(r *sftp.Request) (err error) { // Send object names that are needed to be removed to objectsCh go func() { - defer close(objectsCh) + defer xioutil.SafeClose(objectsCh) opts := minio.ListObjectsOptions{ Prefix: prefix, Recursive: true, diff --git a/cmd/speedtest.go b/cmd/speedtest.go index 0bbd7af1f..7226686f3 100644 --- a/cmd/speedtest.go +++ b/cmd/speedtest.go @@ -27,6 +27,7 @@ import ( "github.com/minio/dperf/pkg/dperf" "github.com/minio/madmin-go/v3" + xioutil "github.com/minio/minio/internal/ioutil" ) const speedTest = "speedtest" @@ -46,7 +47,7 @@ type speedTestOpts struct { func objectSpeedTest(ctx context.Context, opts speedTestOpts) chan madmin.SpeedTestResult { ch := make(chan madmin.SpeedTestResult, 1) go func() { - defer close(ch) + defer xioutil.SafeClose(ch) concurrency := opts.concurrencyStart diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 62b08f8d5..9d165b148 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -23,6 +23,7 @@ import ( "time" "github.com/minio/madmin-go/v3" + xioutil "github.com/minio/minio/internal/ioutil" ) // StorageAPI interface. @@ -280,7 +281,7 @@ func (p *unrecognizedDisk) StatInfoFile(ctx context.Context, volume, path string } func (p *unrecognizedDisk) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error { - close(resp) + xioutil.SafeClose(resp) return errDiskNotFound } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 61e5f045b..1afce16c1 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -37,6 +37,7 @@ import ( "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/grid" xhttp "github.com/minio/minio/internal/http" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/rest" xnet "github.com/minio/pkg/v2/net" @@ -232,7 +233,7 @@ func (client *storageRESTClient) Healing() *healingTracker { func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, _ func() bool) (dataUsageCache, error) { atomic.AddInt32(&client.scanning, 1) defer atomic.AddInt32(&client.scanning, -1) - defer close(updates) + defer xioutil.SafeClose(updates) st, err := storageNSScannerHandler.Call(ctx, client.gridConn, &nsScannerOptions{ DiskID: client.diskID, @@ -771,7 +772,7 @@ func (client *storageRESTClient) StatInfoFile(ctx context.Context, volume, path // The resp channel is closed before the call returns. // Only a canceled context or network errors returns an error. func (client *storageRESTClient) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error { - defer close(resp) + defer xioutil.SafeClose(resp) body, err := req.MarshalMsg(nil) if err != nil { return err diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 723298e46..03527e05a 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -778,7 +778,7 @@ func (c *closeNotifier) Read(p []byte) (n int, err error) { n, err = c.rc.Read(p) if err != nil { if c.done != nil { - close(c.done) + xioutil.SafeClose(c.done) c.done = nil } } @@ -787,7 +787,7 @@ func (c *closeNotifier) Read(p []byte) (n int, err error) { func (c *closeNotifier) Close() error { if c.done != nil { - close(c.done) + xioutil.SafeClose(c.done) c.done = nil } return c.rc.Close() @@ -826,10 +826,10 @@ func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func } else { write([]byte{0}) } - close(doneCh) + xioutil.SafeClose(doneCh) return } - defer close(doneCh) + defer xioutil.SafeClose(doneCh) // Initiate ticker after body has been read. ticker := time.NewTicker(time.Second * 10) for { @@ -889,7 +889,7 @@ func keepHTTPResponseAlive(w http.ResponseWriter) func(error) { } } } - defer close(doneCh) + defer xioutil.SafeClose(doneCh) ticker := time.NewTicker(time.Second * 10) defer ticker.Stop() for { @@ -1027,7 +1027,7 @@ func streamHTTPResponse(w http.ResponseWriter) *httpStreamResponse { } else { write([]byte{0}) } - close(doneCh) + xioutil.SafeClose(doneCh) return case block := <-blockCh: var tmp [5]byte diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index d404a8461..f7b6c8006 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -279,12 +279,12 @@ func (p *xlStorageDiskIDCheck) Healing() *healingTracker { func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, _ func() bool) (dataUsageCache, error) { if contextCanceled(ctx) { - close(updates) + xioutil.SafeClose(updates) return dataUsageCache{}, ctx.Err() } if err := p.checkDiskStale(); err != nil { - close(updates) + xioutil.SafeClose(updates) return dataUsageCache{}, err } @@ -733,7 +733,7 @@ func (p *xlStorageDiskIDCheck) StatInfoFile(ctx context.Context, volume, path st func (p *xlStorageDiskIDCheck) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) (err error) { ctx, done, err := p.TrackDiskHealth(ctx, storageMetricReadMultiple, req.Bucket, req.Prefix) if err != nil { - close(resp) + xioutil.SafeClose(resp) return err } defer done(&err) diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 071c7c527..3c3d3d014 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -491,7 +491,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates }() // Updates must be closed before we return. - defer close(updates) + defer xioutil.SafeClose(updates) var lc *lifecycle.Lifecycle // Check if the current bucket has a configured lifecycle policy @@ -2803,7 +2803,7 @@ func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi File // The resp channel is closed before the call returns. // Only a canceled context will return an error. func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error { - defer close(resp) + defer xioutil.SafeClose(resp) volumeDir := pathJoin(s.drivePath, req.Bucket) found := 0 diff --git a/internal/dsync/drwmutex.go b/internal/dsync/drwmutex.go index 09da06f40..4799b7c36 100644 --- a/internal/dsync/drwmutex.go +++ b/internal/dsync/drwmutex.go @@ -26,6 +26,7 @@ import ( "sync" "time" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/mcontext" "github.com/minio/pkg/v2/console" "github.com/minio/pkg/v2/env" @@ -406,7 +407,7 @@ func refreshLock(ctx context.Context, ds *Dsync, id, source string, quorum int) // We may have some unused results in ch, release them async. go func() { wg.Wait() - close(ch) + xioutil.SafeClose(ch) for range ch { } }() @@ -528,7 +529,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is // We may have some unused results in ch, release them async. go func() { wg.Wait() - close(ch) + xioutil.SafeClose(ch) for grantToBeReleased := range ch { if grantToBeReleased.isLocked() { // release abandoned lock diff --git a/internal/grid/connection.go b/internal/grid/connection.go index 352eba4e2..06a5f8284 100644 --- a/internal/grid/connection.go +++ b/internal/grid/connection.go @@ -40,6 +40,7 @@ import ( "github.com/gobwas/ws/wsutil" "github.com/google/uuid" "github.com/minio/madmin-go/v3" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/pubsub" "github.com/tinylib/msgp/msgp" @@ -449,7 +450,7 @@ func (c *Connection) WaitForConnect(ctx context.Context) error { defer cancel() changed := make(chan State, 1) go func() { - defer close(changed) + defer xioutil.SafeClose(changed) for { c.connChange.Wait() newState := c.State() diff --git a/internal/grid/debug.go b/internal/grid/debug.go index 2f7fee487..eddb577e7 100644 --- a/internal/grid/debug.go +++ b/internal/grid/debug.go @@ -26,6 +26,7 @@ import ( "sync" "time" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/mux" ) @@ -98,7 +99,7 @@ func SetupTestGrid(n int) (*TestGrid, error) { res.Listeners = append(res.Listeners, listeners[i]) res.Mux = append(res.Mux, m) } - close(ready) + xioutil.SafeClose(ready) for _, m := range res.Managers { for _, remote := range m.Targets() { if err := m.Connection(remote).WaitForConnect(ctx); err != nil { diff --git a/internal/grid/handlers.go b/internal/grid/handlers.go index 902f55312..2143baa23 100644 --- a/internal/grid/handlers.go +++ b/internal/grid/handlers.go @@ -26,6 +26,7 @@ import ( "sync" "github.com/minio/minio/internal/hash/sha256" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/tinylib/msgp/msgp" ) @@ -579,7 +580,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) register(m *Manager, handle func // Don't add extra buffering inT = make(chan Req) go func() { - defer close(inT) + defer xioutil.SafeClose(inT) for { select { case <-ctx.Done(): @@ -607,7 +608,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) register(m *Manager, handle func outT := make(chan Resp) outDone := make(chan struct{}) go func() { - defer close(outDone) + defer xioutil.SafeClose(outDone) dropOutput := false for v := range outT { if dropOutput { @@ -629,7 +630,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) register(m *Manager, handle func } }() rErr := handle(ctx, plT, inT, outT) - close(outT) + xioutil.SafeClose(outT) <-outDone return rErr }, OutCapacity: h.OutCapacity, InCapacity: h.InCapacity, Subroute: strings.Join(subroute, "/"), @@ -695,7 +696,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Stre return nil, fmt.Errorf("internal error: stream request channel nil") } go func() { - defer close(stream.Requests) + defer xioutil.SafeClose(stream.Requests) for req := range reqT { b, err := req.MarshalMsg(GetByteBuffer()[:0]) if err != nil { @@ -706,7 +707,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Stre } }() } else if stream.Requests != nil { - close(stream.Requests) + xioutil.SafeClose(stream.Requests) } return &TypedStream[Req, Resp]{responses: stream, newResp: h.NewResponse, Requests: reqT}, nil diff --git a/internal/grid/muxclient.go b/internal/grid/muxclient.go index 3ea50ceae..9425a86a6 100644 --- a/internal/grid/muxclient.go +++ b/internal/grid/muxclient.go @@ -26,6 +26,7 @@ import ( "sync/atomic" "time" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/zeebo/xxh3" ) @@ -267,7 +268,7 @@ func (m *muxClient) handleOneWayStream(start time.Time, respHandler chan<- Respo fmt.Println("Mux", m.MuxID, "Request took", time.Since(start).Round(time.Millisecond)) }() } - defer close(respHandler) + defer xioutil.SafeClose(respHandler) var pingTimer <-chan time.Time if m.deadline == 0 || m.deadline > clientPingInterval { ticker := time.NewTicker(clientPingInterval) @@ -324,7 +325,7 @@ func (m *muxClient) handleOneWayStream(start time.Time, respHandler chan<- Respo func (m *muxClient) handleTwowayResponses(responseCh chan Response, responses chan Response) { defer m.parent.deleteMux(false, m.MuxID) - defer close(responseCh) + defer xioutil.SafeClose(responseCh) for resp := range responses { responseCh <- resp m.send(message{Op: OpUnblockSrvMux, MuxID: m.MuxID}) @@ -534,7 +535,7 @@ func (m *muxClient) closeLocked() { return } if m.respWait != nil { - close(m.respWait) + xioutil.SafeClose(m.respWait) m.respWait = nil } m.closed = true diff --git a/internal/grid/muxserver.go b/internal/grid/muxserver.go index 907722462..f59085010 100644 --- a/internal/grid/muxserver.go +++ b/internal/grid/muxserver.go @@ -26,6 +26,7 @@ import ( "sync/atomic" "time" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" ) @@ -117,7 +118,7 @@ func newMuxStream(ctx context.Context, msg message, c *Connection, handler Strea m.inbound = make(chan []byte, inboundCap) handlerIn = make(chan []byte, 1) go func(inbound <-chan []byte) { - defer close(handlerIn) + defer xioutil.SafeClose(handlerIn) // Send unblocks when we have delivered the message to the handler. for in := range inbound { handlerIn <- in @@ -146,7 +147,7 @@ func newMuxStream(ctx context.Context, msg message, c *Connection, handler Strea if debugPrint { fmt.Println("muxServer: Mux", m.ID, "Returned with", handlerErr) } - close(send) + xioutil.SafeClose(send) }() // handlerErr is guarded by 'send' channel. handlerErr = handler.Handle(ctx, msg.Payload, handlerIn, send) @@ -247,7 +248,7 @@ func (m *muxServer) message(msg message) { logger.LogIf(m.ctx, fmt.Errorf("muxServer: EOF message with payload")) } if m.inbound != nil { - close(m.inbound) + xioutil.SafeClose(m.inbound) m.inbound = nil } return @@ -324,12 +325,9 @@ func (m *muxServer) close() { m.cancel() m.recvMu.Lock() defer m.recvMu.Unlock() - if m.inbound != nil { - close(m.inbound) - m.inbound = nil - } - if m.outBlock != nil { - close(m.outBlock) - m.outBlock = nil - } + xioutil.SafeClose(m.inbound) + m.inbound = nil + + xioutil.SafeClose(m.outBlock) + m.outBlock = nil } diff --git a/internal/ioutil/ioutil.go b/internal/ioutil/ioutil.go index 3948158fb..2cd4136c5 100644 --- a/internal/ioutil/ioutil.go +++ b/internal/ioutil/ioutil.go @@ -25,6 +25,7 @@ import ( "errors" "io" "os" + "runtime/debug" "sync" "time" @@ -418,3 +419,13 @@ func CopyAligned(w io.Writer, r io.Reader, alignedBuf []byte, totalSize int64, f } } } + +// SafeClose safely closes any channel of any type +func SafeClose[T any](c chan<- T) { + if c != nil { + close(c) + } + // Print stack to check who is sending `c` as `nil` + // without crashing the server. + debug.PrintStack() +} diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 24e16449a..cc58c4c83 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -34,6 +34,7 @@ import ( "time" xhttp "github.com/minio/minio/internal/http" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger/target/types" "github.com/minio/minio/internal/once" "github.com/minio/minio/internal/store" @@ -443,7 +444,7 @@ func (h *Target) Cancel() { // and finish the existing ones. // All future ones will be discarded. h.logChMu.Lock() - close(h.logCh) + xioutil.SafeClose(h.logCh) h.logCh = nil h.logChMu.Unlock() diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go index c31c333ce..f96976435 100644 --- a/internal/logger/target/kafka/kafka.go +++ b/internal/logger/target/kafka/kafka.go @@ -34,6 +34,7 @@ import ( "github.com/IBM/sarama" saramatls "github.com/IBM/sarama/tools/tls" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger/target/types" "github.com/minio/minio/internal/once" "github.com/minio/minio/internal/store" @@ -402,7 +403,7 @@ func (h *Target) Cancel() { // and finish the existing ones. // All future ones will be discarded. h.logChMu.Lock() - close(h.logCh) + xioutil.SafeClose(h.logCh) h.logCh = nil h.logChMu.Unlock() diff --git a/internal/store/store.go b/internal/store/store.go index 4eef6db28..4f5b6a60e 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -24,6 +24,7 @@ import ( "strings" "time" + xioutil "github.com/minio/minio/internal/ioutil" xnet "github.com/minio/pkg/v2/net" ) @@ -65,7 +66,7 @@ func replayItems[I any](store Store[I], doneCh <-chan struct{}, log logger, id s keyCh := make(chan Key) go func() { - defer close(keyCh) + defer xioutil.SafeClose(keyCh) retryTicker := time.NewTicker(retryInterval) defer retryTicker.Stop()