mirror of
https://github.com/minio/minio.git
synced 2025-01-12 07:23:23 -05:00
0c4be55936
Fixes two different types of problems - continuation of the problem seen in FS #9992 as not fixed for erasure coded deployments, reproduced this issue with spark and its fixed now - another issue was leaking walk go-routines which would lead to high memory usage and crash the system this is simply because all the walks which were purged at the top limit had leaking end walkers which would consume memory endlessly. closes #9966 closes #10088
212 lines
6.1 KiB
Go
212 lines
6.1 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2016-2020 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"errors"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Global lookup timeout.
|
|
const (
|
|
globalLookupTimeout = time.Minute * 30 // 30minutes.
|
|
treeWalkEntryLimit = 50
|
|
treeWalkSameEntryLimit = 4
|
|
)
|
|
|
|
// listParams - list object params used for list object map
|
|
type listParams struct {
|
|
bucket string
|
|
recursive bool
|
|
marker string
|
|
prefix string
|
|
}
|
|
|
|
// errWalkAbort - returned by doTreeWalk() if it returns prematurely.
|
|
// doTreeWalk() can return prematurely if
|
|
// 1) treeWalk is timed out by the timer go-routine.
|
|
// 2) there is an error during tree walk.
|
|
var errWalkAbort = errors.New("treeWalk abort")
|
|
|
|
// treeWalk - represents the go routine that does the file tree walk.
|
|
type treeWalk struct {
|
|
added time.Time
|
|
resultCh chan TreeWalkResult
|
|
endWalkCh chan struct{} // To signal when treeWalk go-routine should end.
|
|
endTimerCh chan<- struct{} // To signal when timer go-routine should end.
|
|
}
|
|
|
|
// TreeWalkPool - pool of treeWalk go routines.
|
|
// A treeWalk is added to the pool by Set() and removed either by
|
|
// doing a Release() or if the concerned timer goes off.
|
|
// treeWalkPool's purpose is to maintain active treeWalk go-routines in a map so that
|
|
// it can be looked up across related list calls.
|
|
type TreeWalkPool struct {
|
|
mu sync.Mutex
|
|
pool map[listParams][]treeWalk
|
|
timeOut time.Duration
|
|
}
|
|
|
|
// NewTreeWalkPool - initialize new tree walk pool.
|
|
func NewTreeWalkPool(timeout time.Duration) *TreeWalkPool {
|
|
tPool := &TreeWalkPool{
|
|
pool: make(map[listParams][]treeWalk),
|
|
timeOut: timeout,
|
|
}
|
|
return tPool
|
|
}
|
|
|
|
// Release - selects a treeWalk from the pool based on the input
|
|
// listParams, removes it from the pool, and returns the TreeWalkResult
|
|
// channel.
|
|
// Returns nil if listParams does not have an associated treeWalk.
|
|
func (t *TreeWalkPool) Release(params listParams) (resultCh chan TreeWalkResult, endWalkCh chan struct{}) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
walks, ok := t.pool[params] // Pick the valid walks.
|
|
if !ok || len(walks) == 0 {
|
|
// Release return nil if params not found.
|
|
return nil, nil
|
|
}
|
|
|
|
// Pop out the first valid walk entry.
|
|
walk := walks[0]
|
|
walks = walks[1:]
|
|
if len(walks) > 0 {
|
|
t.pool[params] = walks
|
|
} else {
|
|
delete(t.pool, params)
|
|
}
|
|
walk.endTimerCh <- struct{}{}
|
|
return walk.resultCh, walk.endWalkCh
|
|
}
|
|
|
|
// Set - adds a treeWalk to the treeWalkPool.
|
|
// Also starts a timer go-routine that ends when:
|
|
// 1) time.After() expires after t.timeOut seconds.
|
|
// The expiration is needed so that the treeWalk go-routine resources are freed after a timeout
|
|
// if the S3 client does only partial listing of objects.
|
|
// 2) Release() signals the timer go-routine to end on endTimerCh.
|
|
// During listing the timer should not timeout and end the treeWalk go-routine, hence the
|
|
// timer go-routine should be ended.
|
|
func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endWalkCh chan struct{}) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
// If we are above the limit delete at least one entry from the pool.
|
|
if len(t.pool) > treeWalkEntryLimit {
|
|
age := time.Now()
|
|
var oldest listParams
|
|
for k, v := range t.pool {
|
|
if len(v) == 0 {
|
|
delete(t.pool, k)
|
|
continue
|
|
}
|
|
// The first element is the oldest, so we only check that.
|
|
e := v[0]
|
|
if e.added.Before(age) {
|
|
oldest = k
|
|
age = e.added
|
|
}
|
|
}
|
|
// Invalidate and delete oldest.
|
|
if walks, ok := t.pool[oldest]; ok && len(walks) > 0 {
|
|
endCh := walks[0].endTimerCh
|
|
endWalkCh := walks[0].endWalkCh
|
|
if len(walks) > 1 {
|
|
// Move walks forward
|
|
copy(walks, walks[1:])
|
|
walks = walks[:len(walks)-1]
|
|
t.pool[oldest] = walks
|
|
} else {
|
|
// Only entry, just delete.
|
|
delete(t.pool, oldest)
|
|
}
|
|
select {
|
|
case endCh <- struct{}{}:
|
|
close(endWalkCh)
|
|
default:
|
|
}
|
|
} else {
|
|
// Shouldn't happen, but just in case.
|
|
delete(t.pool, oldest)
|
|
}
|
|
}
|
|
|
|
// Should be a buffered channel so that Release() never blocks.
|
|
endTimerCh := make(chan struct{}, 1)
|
|
walkInfo := treeWalk{
|
|
added: UTCNow(),
|
|
resultCh: resultCh,
|
|
endWalkCh: endWalkCh,
|
|
endTimerCh: endTimerCh,
|
|
}
|
|
|
|
// Append new walk info.
|
|
walks := t.pool[params]
|
|
if len(walks) < treeWalkSameEntryLimit {
|
|
t.pool[params] = append(walks, walkInfo)
|
|
} else {
|
|
// We are at limit, invalidate oldest, move list down and add new as last.
|
|
select {
|
|
case walks[0].endTimerCh <- struct{}{}:
|
|
close(walks[0].endWalkCh)
|
|
default:
|
|
}
|
|
copy(walks, walks[1:])
|
|
walks[len(walks)-1] = walkInfo
|
|
}
|
|
|
|
// Timer go-routine which times out after t.timeOut seconds.
|
|
go func(endTimerCh <-chan struct{}, walkInfo treeWalk) {
|
|
select {
|
|
// Wait until timeOut
|
|
case <-time.After(t.timeOut):
|
|
// Timeout has expired. Remove the treeWalk from treeWalkPool and
|
|
// end the treeWalk go-routine.
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
walks, ok := t.pool[params]
|
|
if ok {
|
|
// Trick of filtering without allocating
|
|
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
|
|
nwalks := walks[:0]
|
|
// Look for walkInfo, remove it from the walks list.
|
|
for _, walk := range walks {
|
|
if !reflect.DeepEqual(walk, walkInfo) {
|
|
nwalks = append(nwalks, walk)
|
|
}
|
|
}
|
|
if len(nwalks) == 0 {
|
|
// No more treeWalk go-routines associated with listParams
|
|
// hence remove map entry.
|
|
delete(t.pool, params)
|
|
} else {
|
|
// There are more treeWalk go-routines associated with listParams
|
|
// hence save the list in the map.
|
|
t.pool[params] = nwalks
|
|
}
|
|
}
|
|
// Signal the treeWalk go-routine to die.
|
|
close(endWalkCh)
|
|
case <-endTimerCh:
|
|
return
|
|
}
|
|
}(endTimerCh, walkInfo)
|
|
}
|