avoid close 'nil' panics if any (#18890)

brings a generic implementation that
prints a stack trace for 'nil' channel
closes(), if not safely closes it.
This commit is contained in:
Harshavardhana 2024-01-28 10:04:17 -08:00 committed by GitHub
parent 38de8e6936
commit 1d3bd02089
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
47 changed files with 150 additions and 104 deletions

View File

@ -2657,7 +2657,7 @@ func fetchHealthInfo(healthCtx context.Context, objectAPI ObjectLayer, query *ur
}
go func() {
defer close(healthInfoCh)
defer xioutil.SafeClose(healthInfoCh)
partialWrite(healthInfo) // Write first message with only version and deployment id populated
getAndWritePlatformInfo()
@ -3046,7 +3046,7 @@ func getClusterMetaInfo(ctx context.Context) []byte {
resultCh := make(chan madmin.ClusterRegistrationInfo)
go func() {
defer close(resultCh)
defer xioutil.SafeClose(resultCh)
ci := madmin.ClusterRegistrationInfo{}
ci.Info.NoOfServerPools = len(globalEndpoints)

View File

@ -28,6 +28,7 @@ import (
"time"
"github.com/minio/madmin-go/v3"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
)
@ -800,7 +801,7 @@ func (h *healSequence) healItems(objAPI ObjectLayer, bucketsOnly bool) error {
func (h *healSequence) traverseAndHeal(objAPI ObjectLayer) {
bucketsOnly := false // Heals buckets and objects also.
h.traverseAndHealDoneCh <- h.healItems(objAPI, bucketsOnly)
close(h.traverseAndHealDoneCh)
xioutil.SafeClose(h.traverseAndHealDoneCh)
}
// healMinioSysMeta - heals all files under a given meta prefix, returns a function

View File

@ -32,6 +32,7 @@ import (
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/bucket/versioning"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/env"
"github.com/minio/pkg/v2/wildcard"
@ -648,7 +649,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
case expireCh <- toDel:
}
}
close(expireCh)
xioutil.SafeClose(expireCh)
wk.Wait() // waits for all expire goroutines to complete
@ -658,7 +659,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
// Close the saverQuitCh - this also triggers saving in-memory state
// immediately one last time before we exit this method.
close(saverQuitCh)
xioutil.SafeClose(saverQuitCh)
// Notify expire jobs final status to the configured endpoint
buf, _ := json.Marshal(ri)

View File

@ -47,6 +47,7 @@ import (
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/ioutil"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/console"
"github.com/minio/pkg/v2/env"
@ -545,7 +546,7 @@ func (r BatchJobReplicateV1) writeAsArchive(ctx context.Context, objAPI ObjectLa
}
go func() {
defer close(input)
defer xioutil.SafeClose(input)
for _, entry := range entries {
gr, err := objAPI.GetObjectNInfo(ctx, r.Source.Bucket,
@ -1038,7 +1039,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() {
go func() {
defer close(slowCh)
defer xioutil.SafeClose(slowCh)
// Snowball currently needs the high level minio-go Client, not the Core one
cl, err := miniogo.New(u.Host, &miniogo.Options{
@ -1809,7 +1810,7 @@ func (j *BatchJobPool) queueJob(req *BatchJobRequest) error {
select {
case <-j.ctx.Done():
j.once.Do(func() {
close(j.jobCh)
xioutil.SafeClose(j.jobCh)
})
case j.jobCh <- req:
default:

View File

@ -39,6 +39,7 @@ import (
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/event"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/s3select"
"github.com/minio/pkg/v2/env"
@ -118,8 +119,8 @@ func (es *expiryState) PendingTasks() int {
// close closes work channels exactly once.
func (es *expiryState) close() {
es.once.Do(func() {
close(es.byDaysCh)
close(es.byNewerNoncurrentCh)
xioutil.SafeClose(es.byDaysCh)
xioutil.SafeClose(es.byNewerNoncurrentCh)
})
}

View File

@ -47,6 +47,7 @@ import (
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/tinylib/msgp/msgp"
"github.com/zeebo/xxh3"
@ -1894,7 +1895,7 @@ func (p *ReplicationPool) AddLargeWorkers() {
go func() {
<-p.ctx.Done()
for i := 0; i < LargeWorkerCount; i++ {
close(p.lrgworkers[i])
xioutil.SafeClose(p.lrgworkers[i])
}
}()
}
@ -1953,7 +1954,7 @@ func (p *ReplicationPool) ResizeWorkers(n, checkOld int) {
for len(p.workers) > n {
worker := p.workers[len(p.workers)-1]
p.workers = p.workers[:len(p.workers)-1]
close(worker)
xioutil.SafeClose(worker)
}
}
@ -2755,7 +2756,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
}
workers := make([]chan ReplicateObjectInfo, resyncParallelRoutines)
resultCh := make(chan TargetReplicationResyncStatus, 1)
defer close(resultCh)
defer xioutil.SafeClose(resultCh)
go func() {
for r := range resultCh {
s.incStats(r, opts)
@ -2867,7 +2868,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
}
}
for i := 0; i < resyncParallelRoutines; i++ {
close(workers[i])
xioutil.SafeClose(workers[i])
}
wg.Wait()
resyncStatus = ResyncCompleted
@ -3123,7 +3124,7 @@ func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string,
}
diffCh := make(chan madmin.DiffInfo, 4000)
go func() {
defer close(diffCh)
defer xioutil.SafeClose(diffCh)
for obj := range objInfoCh {
if contextCanceled(ctx) {
// Just consume input...
@ -3316,7 +3317,7 @@ func (p *ReplicationPool) persistMRF() {
mTimer.Reset(mrfSaveInterval)
case <-p.ctx.Done():
p.mrfStopCh <- struct{}{}
close(p.mrfSaveCh)
xioutil.SafeClose(p.mrfSaveCh)
// We try to save if possible, but we don't care beyond that.
saveMRFToDisk()
return
@ -3551,7 +3552,7 @@ func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch <-chan
mrfCh := make(chan madmin.ReplicationMRF, 100)
go func() {
defer close(mrfCh)
defer xioutil.SafeClose(mrfCh)
for vID, e := range mrfRec.Entries {
if bucket != "" && e.Bucket != bucket {
continue

View File

@ -40,6 +40,7 @@ import (
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/config/heal"
"github.com/minio/minio/internal/event"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/console"
uatomic "go.uber.org/atomic"
@ -1479,7 +1480,7 @@ func (d *dynamicSleeper) Update(factor float64, maxWait time.Duration) error {
return nil
}
// Update values and cycle waiting.
close(d.cycle)
xioutil.SafeClose(d.cycle)
d.factor = factor
d.maxSleep = maxWait
d.cycle = make(chan struct{})

View File

@ -25,6 +25,7 @@ import (
"sync"
"sync/atomic"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
)
@ -118,7 +119,7 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
}
readTriggerCh := make(chan bool, len(p.readers))
defer close(readTriggerCh) // close the channel upon return
defer xioutil.SafeClose(readTriggerCh) // close the channel upon return
for i := 0; i < p.dataBlocks; i++ {
// Setup read triggers for p.dataBlocks number of reads so that it reads in parallel.

View File

@ -760,7 +760,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
}
mrfCheck := make(chan FileInfo)
defer close(mrfCheck)
defer xioutil.SafeClose(mrfCheck)
var rw sync.Mutex
@ -810,7 +810,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
}
wg.Wait()
close(done)
xioutil.SafeClose(done)
fi, ok := <-mrfCheck
if !ok {

View File

@ -34,6 +34,7 @@ import (
"github.com/lithammer/shortuuid/v4"
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/hash"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/env"
)
@ -372,7 +373,7 @@ func (z *erasureServerPools) IsPoolRebalancing(poolIndex int) bool {
func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) (err error) {
doneCh := make(chan struct{})
defer close(doneCh)
defer xioutil.SafeClose(doneCh)
// Save rebalance.bin periodically.
go func() {

View File

@ -40,6 +40,7 @@ import (
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/bpool"
"github.com/minio/minio/internal/config/storageclass"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/sync/errgroup"
"github.com/minio/pkg/v2/wildcard"
@ -653,7 +654,7 @@ func (z *erasureServerPools) StorageInfo(ctx context.Context, metrics bool) Stor
func (z *erasureServerPools) NSScanner(ctx context.Context, updates chan<- DataUsageInfo, wantCycle uint32, healScanMode madmin.HealScanMode) error {
// Updates must be closed before we return.
defer close(updates)
defer xioutil.SafeClose(updates)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -680,7 +681,7 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, updates chan<- DataU
results = append(results, dataUsageCache{})
go func(i int, erObj *erasureObjects) {
updates := make(chan dataUsageCache, 1)
defer close(updates)
defer xioutil.SafeClose(updates)
// Start update collector.
go func() {
defer wg.Done()
@ -739,7 +740,7 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, updates chan<- DataU
return
case v := <-updateCloser:
update()
close(v)
xioutil.SafeClose(v)
return
case <-updateTicker.C:
update()
@ -1957,7 +1958,7 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts
func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts WalkOptions) error {
if err := checkListObjsArgs(ctx, bucket, prefix, "", z); err != nil {
// Upon error close the channel.
close(results)
xioutil.SafeClose(results)
return err
}
@ -1966,7 +1967,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
ctx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
defer close(results)
defer xioutil.SafeClose(results)
for _, erasureSet := range z.serverPools {
var wg sync.WaitGroup

View File

@ -37,6 +37,7 @@ import (
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/dsync"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/console"
"github.com/minio/pkg/v2/sync/errgroup"
@ -667,10 +668,10 @@ func (s *erasureSets) Shutdown(ctx context.Context) error {
select {
case _, ok := <-s.setReconnectEvent:
if ok {
close(s.setReconnectEvent)
xioutil.SafeClose(s.setReconnectEvent)
}
default:
close(s.setReconnectEvent)
xioutil.SafeClose(s.setReconnectEvent)
}
return nil
}

View File

@ -423,7 +423,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa
bucketCh <- b
}
}
close(bucketCh)
xioutil.SafeClose(bucketCh)
bucketResults := make(chan dataUsageEntryInfo, len(disks))
@ -560,7 +560,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa
}(i)
}
wg.Wait()
close(bucketResults)
xioutil.SafeClose(bucketResults)
saverWg.Wait()
return nil

View File

@ -32,6 +32,7 @@ import (
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio/internal/auth"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
ftp "goftp.io/server/v2"
)
@ -386,7 +387,7 @@ func (driver *ftpDriver) DeleteDir(ctx *ftp.Context, path string) (err error) {
// Send object names that are needed to be removed to objectsCh
go func() {
defer close(objectsCh)
defer xioutil.SafeClose(objectsCh)
opts := minio.ListObjectsOptions{
Prefix: prefix,
Recursive: true,

View File

@ -28,6 +28,7 @@ import (
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/config/storageclass"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/console"
"github.com/minio/pkg/v2/wildcard"
@ -443,7 +444,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
finished: nil,
})
jt.Wait() // synchronize all the concurrent heal jobs
close(results)
xioutil.SafeClose(results)
if err != nil {
// Set this such that when we return this function
// we let the caller retry this disk again for the

View File

@ -31,6 +31,7 @@ import (
"github.com/minio/madmin-go/v3"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio/internal/config"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/kms"
"github.com/minio/minio/internal/logger"
)
@ -575,7 +576,7 @@ func listIAMConfigItems(ctx context.Context, objAPI ObjectLayer, pathPrefix stri
ch := make(chan itemOrErr)
go func() {
defer close(ch)
defer xioutil.SafeClose(ch)
// Allocate new results channel to receive ObjectInfo.
objInfoCh := make(chan ObjectInfo)

View File

@ -45,6 +45,7 @@ import (
"github.com/minio/minio/internal/config/policy/opa"
polplugin "github.com/minio/minio/internal/config/policy/plugin"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/jwt"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/policy"
@ -207,7 +208,7 @@ func (sys *IAMSys) Load(ctx context.Context, firstTime bool) error {
select {
case <-sys.configLoaded:
default:
close(sys.configLoaded)
xioutil.SafeClose(sys.configLoaded)
}
return nil
}

View File

@ -26,6 +26,7 @@ import (
"sort"
"strings"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/console"
)
@ -659,7 +660,7 @@ func (m *metaCacheEntriesSorted) forwardPast(s string) {
// If the context is canceled the function will return the error,
// otherwise the function will return nil.
func mergeEntryChannels(ctx context.Context, in []chan metaCacheEntry, out chan<- metaCacheEntry, readQuorum int) error {
defer close(out)
defer xioutil.SafeClose(out)
top := make([]*metaCacheEntry, len(in))
nDone := 0
ctxDone := ctx.Done()

View File

@ -28,6 +28,7 @@ import (
"sync"
"time"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
)
@ -339,7 +340,7 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions,
// When 'in' is closed or the context is canceled the
// function closes 'out' and exits.
func applyBucketActions(ctx context.Context, o listPathOptions, in <-chan metaCacheEntry, out chan<- metaCacheEntry) {
defer close(out)
defer xioutil.SafeClose(out)
for {
var obj metaCacheEntry
@ -472,16 +473,16 @@ func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions
funcReturnedMu.Unlock()
outCh <- entry
if returned {
close(outCh)
xioutil.SafeClose(outCh)
}
}
entry.reusable = returned
saveCh <- entry
}
if !returned {
close(outCh)
xioutil.SafeClose(outCh)
}
close(saveCh)
xioutil.SafeClose(saveCh)
}()
return filteredResults()

View File

@ -37,6 +37,7 @@ import (
"github.com/minio/minio/internal/bucket/versioning"
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/hash"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/console"
)
@ -679,7 +680,7 @@ func getQuorumDisks(disks []StorageAPI, infos []DiskInfo, readQuorum int) (newDi
// Will return io.EOF if continuing would not yield more results.
func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, results chan<- metaCacheEntry) (err error) {
defer close(results)
defer xioutil.SafeClose(results)
o.debugf(color.Green("listPath:")+" with options: %#v", o)
// get prioritized non-healing disks for listing

View File

@ -27,6 +27,7 @@ import (
jsoniter "github.com/json-iterator/go"
"github.com/klauspost/compress/s2"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/tinylib/msgp/msgp"
"github.com/valyala/bytebufferpool"
@ -550,7 +551,7 @@ func (r *metacacheReader) readAll(ctx context.Context, dst chan<- metaCacheEntry
if r.err != nil {
return r.err
}
defer close(dst)
defer xioutil.SafeClose(dst)
if r.current.name != "" {
select {
case <-ctx.Done():

View File

@ -94,7 +94,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
if err != nil {
return err
}
defer close(out)
defer xioutil.SafeClose(out)
var objsReturned int
objReturned := func(metadata []byte) {

View File

@ -32,6 +32,7 @@ import (
"github.com/minio/kes-go"
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/bucket/lifecycle"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/mcontext"
"github.com/minio/minio/internal/rest"
@ -1724,7 +1725,7 @@ func getGoMetrics() *MetricsGroup {
func getHistogramMetrics(hist *prometheus.HistogramVec, desc MetricDescription) []Metric {
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
defer xioutil.SafeClose(ch)
// Collects prometheus metrics from hist and sends it over ch
hist.Collect(ch)
}()
@ -3881,7 +3882,7 @@ func (c *minioClusterCollector) Collect(out chan<- prometheus.Metric) {
func ReportMetrics(ctx context.Context, metricsGroups []*MetricsGroup) <-chan Metric {
ch := make(chan Metric)
go func() {
defer close(ch)
defer xioutil.SafeClose(ch)
populateAndPublish(metricsGroups, func(m Metric) bool {
if m.VariableLabels == nil {
m.VariableLabels = make(map[string]string)

View File

@ -33,6 +33,7 @@ import (
"github.com/cespare/xxhash/v2"
"github.com/klauspost/compress/zip"
"github.com/minio/madmin-go/v3"
xioutil "github.com/minio/minio/internal/ioutil"
xnet "github.com/minio/pkg/v2/net"
"github.com/minio/pkg/v2/sync/errgroup"
"github.com/minio/pkg/v2/workers"
@ -1263,7 +1264,7 @@ func (sys *NotificationSys) collectPeerMetrics(ctx context.Context, peerChannels
}
go func(wg *sync.WaitGroup, ch chan Metric) {
wg.Wait()
close(ch)
xioutil.SafeClose(ch)
}(&wg, ch)
return ch
}
@ -1488,7 +1489,7 @@ func (sys *NotificationSys) DriveSpeedTest(ctx context.Context, opts madmin.Driv
go func(wg *sync.WaitGroup, ch chan madmin.DriveSpeedTestResult) {
wg.Wait()
close(ch)
xioutil.SafeClose(ch)
}(&wg, ch)
return ch
@ -1616,7 +1617,7 @@ func (sys *NotificationSys) GetReplicationMRF(ctx context.Context, bucket, node
}(mrfCh)
go func(wg *sync.WaitGroup) {
wg.Wait()
close(mrfCh)
xioutil.SafeClose(mrfCh)
}(&wg)
return mrfCh, nil
}

View File

@ -46,6 +46,7 @@ import (
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/ioutil"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/trie"
"github.com/minio/pkg/v2/wildcard"
@ -1081,7 +1082,7 @@ func newS2CompressReader(r io.Reader, on int64, encrypted bool) (rc io.ReadClose
comp := s2.NewWriter(pw, opts...)
indexCh := make(chan []byte, 1)
go func() {
defer close(indexCh)
defer xioutil.SafeClose(indexCh)
cn, err := io.Copy(comp, r)
if err != nil {
comp.Close()

View File

@ -34,6 +34,7 @@ import (
"github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/event"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/rest"
"github.com/minio/pkg/v2/logger/message/log"
@ -253,7 +254,7 @@ func (client *peerRESTClient) GetResourceMetrics(ctx context.Context) (<-chan Me
go func(ch chan<- Metric) {
defer func() {
xhttp.DrainBody(respBody)
close(ch)
xioutil.SafeClose(ch)
}()
for {
var metric Metric
@ -625,7 +626,7 @@ func (client *peerRESTClient) doTrace(traceCh chan<- madmin.TraceInfo, doneCh <-
ctx, cancel := context.WithCancel(GlobalContext)
cancelCh := make(chan struct{})
defer close(cancelCh)
defer xioutil.SafeClose(cancelCh)
go func() {
select {
case <-doneCh:
@ -663,7 +664,7 @@ func (client *peerRESTClient) doListen(listenCh chan<- event.Event, doneCh <-cha
ctx, cancel := context.WithCancel(GlobalContext)
cancelCh := make(chan struct{})
defer close(cancelCh)
defer xioutil.SafeClose(cancelCh)
go func() {
select {
case <-doneCh:
@ -733,7 +734,7 @@ func (client *peerRESTClient) doConsoleLog(logCh chan log.Info, doneCh <-chan st
ctx, cancel := context.WithCancel(GlobalContext)
cancelCh := make(chan struct{})
defer close(cancelCh)
defer xioutil.SafeClose(cancelCh)
go func() {
select {
case <-doneCh:
@ -860,7 +861,7 @@ func (client *peerRESTClient) GetPeerMetrics(ctx context.Context) (<-chan Metric
go func(ch chan<- Metric) {
defer func() {
xhttp.DrainBody(respBody)
close(ch)
xioutil.SafeClose(ch)
}()
for {
var metric Metric
@ -887,7 +888,7 @@ func (client *peerRESTClient) GetPeerBucketMetrics(ctx context.Context) (<-chan
go func(ch chan<- Metric) {
defer func() {
xhttp.DrainBody(respBody)
close(ch)
xioutil.SafeClose(ch)
}()
for {
var metric Metric
@ -1025,7 +1026,7 @@ func (client *peerRESTClient) GetReplicationMRF(ctx context.Context, bucket stri
go func(ch chan madmin.ReplicationMRF) {
defer func() {
xhttp.DrainBody(respBody)
close(ch)
xioutil.SafeClose(ch)
}()
for {
var entry madmin.ReplicationMRF

View File

@ -1237,7 +1237,7 @@ func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Reques
}
doneCh := make(chan struct{})
defer close(doneCh)
defer xioutil.SafeClose(doneCh)
ch := make(chan log.Info, 100000)
err := globalConsoleSys.Subscribe(ch, doneCh, "", 0, madmin.LogMaskAll, nil)
@ -1298,7 +1298,7 @@ func (s *peerRESTServer) GetBandwidth(w http.ResponseWriter, r *http.Request) {
bucketsString := r.Form.Get("buckets")
doneCh := make(chan struct{})
defer close(doneCh)
defer xioutil.SafeClose(doneCh)
selectBuckets := b.SelectBuckets(strings.Split(bucketsString, ",")...)
report := globalBucketMonitor.GetReport(selectBuckets)

View File

@ -318,7 +318,7 @@ func netperf(ctx context.Context, duration time.Duration) madmin.NetperfNodeResu
}
time.Sleep(duration)
close(r.eof)
xioutil.SafeClose(r.eof)
wg.Wait()
for {
if globalNetPerfRX.ActiveConnections() == 0 {
@ -376,7 +376,7 @@ func siteNetperf(ctx context.Context, duration time.Duration) madmin.SiteNetPerf
}
time.Sleep(duration)
close(r.eof)
xioutil.SafeClose(r.eof)
wg.Wait()
for {
if globalSiteNetPerfRX.ActiveConnections() == 0 || contextCanceled(ctx) {

View File

@ -47,6 +47,7 @@ import (
"github.com/minio/minio/internal/handlers"
"github.com/minio/minio/internal/hash/sha256"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/certs"
"github.com/minio/pkg/v2/env"
@ -737,7 +738,7 @@ func serverMain(ctx *cli.Context) {
logger.Fatal(config.ErrUnexpectedError(err), "Unable to configure one of server's RPC services")
}
// Allow grid to start after registering all services.
close(globalGridStart)
xioutil.SafeClose(globalGridStart)
httpServer := xhttp.NewServer(getServerListenAddrs()).
UseHandler(setCriticalErrorHandler(corsHandler(handler))).

View File

@ -23,6 +23,8 @@ import (
"os/exec"
"runtime"
"syscall"
xioutil "github.com/minio/minio/internal/ioutil"
)
// Type of service signals currently supported.
@ -109,7 +111,7 @@ func unfreezeServices() {
if val := globalServiceFreeze.Swap(_ch); val != nil {
if ch, ok := val.(chan struct{}); ok && ch != nil {
// Close previous non-nil channel.
close(ch)
xioutil.SafeClose(ch)
}
}
globalServiceFreezeCnt = 0 // Don't risk going negative.

View File

@ -32,6 +32,7 @@ import (
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio/internal/auth"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
@ -357,7 +358,7 @@ func (f *sftpDriver) Filecmd(r *sftp.Request) (err error) {
// Send object names that are needed to be removed to objectsCh
go func() {
defer close(objectsCh)
defer xioutil.SafeClose(objectsCh)
opts := minio.ListObjectsOptions{
Prefix: prefix,
Recursive: true,

View File

@ -27,6 +27,7 @@ import (
"github.com/minio/dperf/pkg/dperf"
"github.com/minio/madmin-go/v3"
xioutil "github.com/minio/minio/internal/ioutil"
)
const speedTest = "speedtest"
@ -46,7 +47,7 @@ type speedTestOpts struct {
func objectSpeedTest(ctx context.Context, opts speedTestOpts) chan madmin.SpeedTestResult {
ch := make(chan madmin.SpeedTestResult, 1)
go func() {
defer close(ch)
defer xioutil.SafeClose(ch)
concurrency := opts.concurrencyStart

View File

@ -23,6 +23,7 @@ import (
"time"
"github.com/minio/madmin-go/v3"
xioutil "github.com/minio/minio/internal/ioutil"
)
// StorageAPI interface.
@ -280,7 +281,7 @@ func (p *unrecognizedDisk) StatInfoFile(ctx context.Context, volume, path string
}
func (p *unrecognizedDisk) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error {
close(resp)
xioutil.SafeClose(resp)
return errDiskNotFound
}

View File

@ -37,6 +37,7 @@ import (
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/grid"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/rest"
xnet "github.com/minio/pkg/v2/net"
@ -232,7 +233,7 @@ func (client *storageRESTClient) Healing() *healingTracker {
func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, _ func() bool) (dataUsageCache, error) {
atomic.AddInt32(&client.scanning, 1)
defer atomic.AddInt32(&client.scanning, -1)
defer close(updates)
defer xioutil.SafeClose(updates)
st, err := storageNSScannerHandler.Call(ctx, client.gridConn, &nsScannerOptions{
DiskID: client.diskID,
@ -771,7 +772,7 @@ func (client *storageRESTClient) StatInfoFile(ctx context.Context, volume, path
// The resp channel is closed before the call returns.
// Only a canceled context or network errors returns an error.
func (client *storageRESTClient) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error {
defer close(resp)
defer xioutil.SafeClose(resp)
body, err := req.MarshalMsg(nil)
if err != nil {
return err

View File

@ -778,7 +778,7 @@ func (c *closeNotifier) Read(p []byte) (n int, err error) {
n, err = c.rc.Read(p)
if err != nil {
if c.done != nil {
close(c.done)
xioutil.SafeClose(c.done)
c.done = nil
}
}
@ -787,7 +787,7 @@ func (c *closeNotifier) Read(p []byte) (n int, err error) {
func (c *closeNotifier) Close() error {
if c.done != nil {
close(c.done)
xioutil.SafeClose(c.done)
c.done = nil
}
return c.rc.Close()
@ -826,10 +826,10 @@ func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func
} else {
write([]byte{0})
}
close(doneCh)
xioutil.SafeClose(doneCh)
return
}
defer close(doneCh)
defer xioutil.SafeClose(doneCh)
// Initiate ticker after body has been read.
ticker := time.NewTicker(time.Second * 10)
for {
@ -889,7 +889,7 @@ func keepHTTPResponseAlive(w http.ResponseWriter) func(error) {
}
}
}
defer close(doneCh)
defer xioutil.SafeClose(doneCh)
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for {
@ -1027,7 +1027,7 @@ func streamHTTPResponse(w http.ResponseWriter) *httpStreamResponse {
} else {
write([]byte{0})
}
close(doneCh)
xioutil.SafeClose(doneCh)
return
case block := <-blockCh:
var tmp [5]byte

View File

@ -279,12 +279,12 @@ func (p *xlStorageDiskIDCheck) Healing() *healingTracker {
func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, _ func() bool) (dataUsageCache, error) {
if contextCanceled(ctx) {
close(updates)
xioutil.SafeClose(updates)
return dataUsageCache{}, ctx.Err()
}
if err := p.checkDiskStale(); err != nil {
close(updates)
xioutil.SafeClose(updates)
return dataUsageCache{}, err
}
@ -733,7 +733,7 @@ func (p *xlStorageDiskIDCheck) StatInfoFile(ctx context.Context, volume, path st
func (p *xlStorageDiskIDCheck) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) (err error) {
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricReadMultiple, req.Bucket, req.Prefix)
if err != nil {
close(resp)
xioutil.SafeClose(resp)
return err
}
defer done(&err)

View File

@ -491,7 +491,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
}()
// Updates must be closed before we return.
defer close(updates)
defer xioutil.SafeClose(updates)
var lc *lifecycle.Lifecycle
// Check if the current bucket has a configured lifecycle policy
@ -2803,7 +2803,7 @@ func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi File
// The resp channel is closed before the call returns.
// Only a canceled context will return an error.
func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error {
defer close(resp)
defer xioutil.SafeClose(resp)
volumeDir := pathJoin(s.drivePath, req.Bucket)
found := 0

View File

@ -26,6 +26,7 @@ import (
"sync"
"time"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/mcontext"
"github.com/minio/pkg/v2/console"
"github.com/minio/pkg/v2/env"
@ -406,7 +407,7 @@ func refreshLock(ctx context.Context, ds *Dsync, id, source string, quorum int)
// We may have some unused results in ch, release them async.
go func() {
wg.Wait()
close(ch)
xioutil.SafeClose(ch)
for range ch {
}
}()
@ -528,7 +529,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
// We may have some unused results in ch, release them async.
go func() {
wg.Wait()
close(ch)
xioutil.SafeClose(ch)
for grantToBeReleased := range ch {
if grantToBeReleased.isLocked() {
// release abandoned lock

View File

@ -40,6 +40,7 @@ import (
"github.com/gobwas/ws/wsutil"
"github.com/google/uuid"
"github.com/minio/madmin-go/v3"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/pubsub"
"github.com/tinylib/msgp/msgp"
@ -449,7 +450,7 @@ func (c *Connection) WaitForConnect(ctx context.Context) error {
defer cancel()
changed := make(chan State, 1)
go func() {
defer close(changed)
defer xioutil.SafeClose(changed)
for {
c.connChange.Wait()
newState := c.State()

View File

@ -26,6 +26,7 @@ import (
"sync"
"time"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/mux"
)
@ -98,7 +99,7 @@ func SetupTestGrid(n int) (*TestGrid, error) {
res.Listeners = append(res.Listeners, listeners[i])
res.Mux = append(res.Mux, m)
}
close(ready)
xioutil.SafeClose(ready)
for _, m := range res.Managers {
for _, remote := range m.Targets() {
if err := m.Connection(remote).WaitForConnect(ctx); err != nil {

View File

@ -26,6 +26,7 @@ import (
"sync"
"github.com/minio/minio/internal/hash/sha256"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/tinylib/msgp/msgp"
)
@ -579,7 +580,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) register(m *Manager, handle func
// Don't add extra buffering
inT = make(chan Req)
go func() {
defer close(inT)
defer xioutil.SafeClose(inT)
for {
select {
case <-ctx.Done():
@ -607,7 +608,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) register(m *Manager, handle func
outT := make(chan Resp)
outDone := make(chan struct{})
go func() {
defer close(outDone)
defer xioutil.SafeClose(outDone)
dropOutput := false
for v := range outT {
if dropOutput {
@ -629,7 +630,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) register(m *Manager, handle func
}
}()
rErr := handle(ctx, plT, inT, outT)
close(outT)
xioutil.SafeClose(outT)
<-outDone
return rErr
}, OutCapacity: h.OutCapacity, InCapacity: h.InCapacity, Subroute: strings.Join(subroute, "/"),
@ -695,7 +696,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Stre
return nil, fmt.Errorf("internal error: stream request channel nil")
}
go func() {
defer close(stream.Requests)
defer xioutil.SafeClose(stream.Requests)
for req := range reqT {
b, err := req.MarshalMsg(GetByteBuffer()[:0])
if err != nil {
@ -706,7 +707,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Stre
}
}()
} else if stream.Requests != nil {
close(stream.Requests)
xioutil.SafeClose(stream.Requests)
}
return &TypedStream[Req, Resp]{responses: stream, newResp: h.NewResponse, Requests: reqT}, nil

View File

@ -26,6 +26,7 @@ import (
"sync/atomic"
"time"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/zeebo/xxh3"
)
@ -267,7 +268,7 @@ func (m *muxClient) handleOneWayStream(start time.Time, respHandler chan<- Respo
fmt.Println("Mux", m.MuxID, "Request took", time.Since(start).Round(time.Millisecond))
}()
}
defer close(respHandler)
defer xioutil.SafeClose(respHandler)
var pingTimer <-chan time.Time
if m.deadline == 0 || m.deadline > clientPingInterval {
ticker := time.NewTicker(clientPingInterval)
@ -324,7 +325,7 @@ func (m *muxClient) handleOneWayStream(start time.Time, respHandler chan<- Respo
func (m *muxClient) handleTwowayResponses(responseCh chan Response, responses chan Response) {
defer m.parent.deleteMux(false, m.MuxID)
defer close(responseCh)
defer xioutil.SafeClose(responseCh)
for resp := range responses {
responseCh <- resp
m.send(message{Op: OpUnblockSrvMux, MuxID: m.MuxID})
@ -534,7 +535,7 @@ func (m *muxClient) closeLocked() {
return
}
if m.respWait != nil {
close(m.respWait)
xioutil.SafeClose(m.respWait)
m.respWait = nil
}
m.closed = true

View File

@ -26,6 +26,7 @@ import (
"sync/atomic"
"time"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
)
@ -117,7 +118,7 @@ func newMuxStream(ctx context.Context, msg message, c *Connection, handler Strea
m.inbound = make(chan []byte, inboundCap)
handlerIn = make(chan []byte, 1)
go func(inbound <-chan []byte) {
defer close(handlerIn)
defer xioutil.SafeClose(handlerIn)
// Send unblocks when we have delivered the message to the handler.
for in := range inbound {
handlerIn <- in
@ -146,7 +147,7 @@ func newMuxStream(ctx context.Context, msg message, c *Connection, handler Strea
if debugPrint {
fmt.Println("muxServer: Mux", m.ID, "Returned with", handlerErr)
}
close(send)
xioutil.SafeClose(send)
}()
// handlerErr is guarded by 'send' channel.
handlerErr = handler.Handle(ctx, msg.Payload, handlerIn, send)
@ -247,7 +248,7 @@ func (m *muxServer) message(msg message) {
logger.LogIf(m.ctx, fmt.Errorf("muxServer: EOF message with payload"))
}
if m.inbound != nil {
close(m.inbound)
xioutil.SafeClose(m.inbound)
m.inbound = nil
}
return
@ -324,12 +325,9 @@ func (m *muxServer) close() {
m.cancel()
m.recvMu.Lock()
defer m.recvMu.Unlock()
if m.inbound != nil {
close(m.inbound)
xioutil.SafeClose(m.inbound)
m.inbound = nil
}
if m.outBlock != nil {
close(m.outBlock)
xioutil.SafeClose(m.outBlock)
m.outBlock = nil
}
}

View File

@ -25,6 +25,7 @@ import (
"errors"
"io"
"os"
"runtime/debug"
"sync"
"time"
@ -418,3 +419,13 @@ func CopyAligned(w io.Writer, r io.Reader, alignedBuf []byte, totalSize int64, f
}
}
}
// SafeClose safely closes any channel of any type
func SafeClose[T any](c chan<- T) {
if c != nil {
close(c)
}
// Print stack to check who is sending `c` as `nil`
// without crashing the server.
debug.PrintStack()
}

View File

@ -34,6 +34,7 @@ import (
"time"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger/target/types"
"github.com/minio/minio/internal/once"
"github.com/minio/minio/internal/store"
@ -443,7 +444,7 @@ func (h *Target) Cancel() {
// and finish the existing ones.
// All future ones will be discarded.
h.logChMu.Lock()
close(h.logCh)
xioutil.SafeClose(h.logCh)
h.logCh = nil
h.logChMu.Unlock()

View File

@ -34,6 +34,7 @@ import (
"github.com/IBM/sarama"
saramatls "github.com/IBM/sarama/tools/tls"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger/target/types"
"github.com/minio/minio/internal/once"
"github.com/minio/minio/internal/store"
@ -402,7 +403,7 @@ func (h *Target) Cancel() {
// and finish the existing ones.
// All future ones will be discarded.
h.logChMu.Lock()
close(h.logCh)
xioutil.SafeClose(h.logCh)
h.logCh = nil
h.logChMu.Unlock()

View File

@ -24,6 +24,7 @@ import (
"strings"
"time"
xioutil "github.com/minio/minio/internal/ioutil"
xnet "github.com/minio/pkg/v2/net"
)
@ -65,7 +66,7 @@ func replayItems[I any](store Store[I], doneCh <-chan struct{}, log logger, id s
keyCh := make(chan Key)
go func() {
defer close(keyCh)
defer xioutil.SafeClose(keyCh)
retryTicker := time.NewTicker(retryInterval)
defer retryTicker.Stop()