mirror of
https://github.com/minio/minio.git
synced 2025-01-15 16:53:16 -05:00
0c4be55936
Fixes two different types of problems - continuation of the problem seen in FS #9992 as not fixed for erasure coded deployments, reproduced this issue with spark and its fixed now - another issue was leaking walk go-routines which would lead to high memory usage and crash the system this is simply because all the walks which were purged at the top limit had leaking end walkers which would consume memory endlessly. closes #9966 closes #10088
354 lines
10 KiB
Go
354 lines
10 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2019, 2020 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
globalMergeLookupTimeout = time.Minute * 1 // 1 minutes.
|
|
)
|
|
|
|
// mergeWalkVersions - represents the go routine that does the merge walk versions.
|
|
type mergeWalkVersions struct {
|
|
added time.Time
|
|
entryChs []FileInfoVersionsCh
|
|
endWalkCh chan struct{} // To signal when mergeWalk go-routine should end.
|
|
endTimerCh chan<- struct{} // To signal when timer go-routine should end.
|
|
}
|
|
|
|
// mergeWalk - represents the go routine that does the merge walk.
|
|
type mergeWalk struct {
|
|
added time.Time
|
|
entryChs []FileInfoCh
|
|
endWalkCh chan struct{} // To signal when mergeWalk go-routine should end.
|
|
endTimerCh chan<- struct{} // To signal when timer go-routine should end.
|
|
}
|
|
|
|
// MergeWalkVersionsPool - pool of mergeWalk go routines.
|
|
// A mergeWalk is added to the pool by Set() and removed either by
|
|
// doing a Release() or if the concerned timer goes off.
|
|
// mergeWalkPool's purpose is to maintain active mergeWalk go-routines in a map so that
|
|
// it can be looked up across related list calls.
|
|
type MergeWalkVersionsPool struct {
|
|
sync.Mutex
|
|
pool map[listParams][]mergeWalkVersions
|
|
timeOut time.Duration
|
|
}
|
|
|
|
// NewMergeWalkVersionsPool - initialize new tree walk pool for versions.
|
|
func NewMergeWalkVersionsPool(timeout time.Duration) *MergeWalkVersionsPool {
|
|
tPool := &MergeWalkVersionsPool{
|
|
pool: make(map[listParams][]mergeWalkVersions),
|
|
timeOut: timeout,
|
|
}
|
|
return tPool
|
|
}
|
|
|
|
// Release - similar to mergeWalkPool.Release but for versions.
|
|
func (t *MergeWalkVersionsPool) Release(params listParams) ([]FileInfoVersionsCh, chan struct{}) {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
walks, ok := t.pool[params] // Pick the valid walks.
|
|
if !ok || len(walks) == 0 {
|
|
// Release return nil if params not found.
|
|
return nil, nil
|
|
}
|
|
|
|
// Pop out the first valid walk entry.
|
|
walk := walks[0]
|
|
walks = walks[1:]
|
|
if len(walks) > 0 {
|
|
t.pool[params] = walks
|
|
} else {
|
|
delete(t.pool, params)
|
|
}
|
|
walk.endTimerCh <- struct{}{}
|
|
return walk.entryChs, walk.endWalkCh
|
|
}
|
|
|
|
// Set - similar to mergeWalkPool.Set but for file versions
|
|
func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersionsCh, endWalkCh chan struct{}) {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
|
|
// If we are above the limit delete at least one entry from the pool.
|
|
if len(t.pool) > treeWalkEntryLimit {
|
|
age := time.Now()
|
|
var oldest listParams
|
|
for k, v := range t.pool {
|
|
if len(v) == 0 {
|
|
delete(t.pool, k)
|
|
continue
|
|
}
|
|
// The first element is the oldest, so we only check that.
|
|
e := v[0]
|
|
if e.added.Before(age) {
|
|
oldest = k
|
|
age = e.added
|
|
}
|
|
}
|
|
// Invalidate and delete oldest.
|
|
if walks, ok := t.pool[oldest]; ok && len(walks) > 0 {
|
|
endCh := walks[0].endTimerCh
|
|
endWalkCh := walks[0].endWalkCh
|
|
if len(walks) > 1 {
|
|
// Move walks forward
|
|
copy(walks, walks[1:])
|
|
walks = walks[:len(walks)-1]
|
|
t.pool[oldest] = walks
|
|
} else {
|
|
// Only entry, just delete.
|
|
delete(t.pool, oldest)
|
|
}
|
|
select {
|
|
case endCh <- struct{}{}:
|
|
close(endWalkCh)
|
|
default:
|
|
}
|
|
} else {
|
|
// Shouldn't happen, but just in case.
|
|
delete(t.pool, oldest)
|
|
}
|
|
}
|
|
|
|
// Should be a buffered channel so that Release() never blocks.
|
|
endTimerCh := make(chan struct{}, 1)
|
|
|
|
walkInfo := mergeWalkVersions{
|
|
added: UTCNow(),
|
|
entryChs: resultChs,
|
|
endWalkCh: endWalkCh,
|
|
endTimerCh: endTimerCh,
|
|
}
|
|
|
|
// Append new walk info.
|
|
walks := t.pool[params]
|
|
if len(walks) < treeWalkSameEntryLimit {
|
|
t.pool[params] = append(walks, walkInfo)
|
|
} else {
|
|
// We are at limit, invalidate oldest, move list down and add new as last.
|
|
select {
|
|
case walks[0].endTimerCh <- struct{}{}:
|
|
close(walks[0].endWalkCh)
|
|
default:
|
|
}
|
|
copy(walks, walks[1:])
|
|
walks[len(walks)-1] = walkInfo
|
|
}
|
|
|
|
// Timer go-routine which times out after t.timeOut seconds.
|
|
go func(endTimerCh <-chan struct{}, walkInfo mergeWalkVersions) {
|
|
select {
|
|
// Wait until timeOut
|
|
case <-time.After(t.timeOut):
|
|
// Timeout has expired. Remove the mergeWalk from mergeWalkPool and
|
|
// end the mergeWalk go-routine.
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
walks, ok := t.pool[params]
|
|
if ok {
|
|
// Trick of filtering without allocating
|
|
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
|
|
nwalks := walks[:0]
|
|
// Look for walkInfo, remove it from the walks list.
|
|
for _, walk := range walks {
|
|
if !reflect.DeepEqual(walk, walkInfo) {
|
|
nwalks = append(nwalks, walk)
|
|
}
|
|
}
|
|
if len(nwalks) == 0 {
|
|
// No more mergeWalk go-routines associated with listParams
|
|
// hence remove map entry.
|
|
delete(t.pool, params)
|
|
} else {
|
|
// There are more mergeWalk go-routines associated with listParams
|
|
// hence save the list in the map.
|
|
t.pool[params] = nwalks
|
|
}
|
|
}
|
|
// Signal the mergeWalk go-routine to die.
|
|
close(endWalkCh)
|
|
case <-endTimerCh:
|
|
return
|
|
}
|
|
}(endTimerCh, walkInfo)
|
|
}
|
|
|
|
// MergeWalkPool - pool of mergeWalk go routines.
|
|
// A mergeWalk is added to the pool by Set() and removed either by
|
|
// doing a Release() or if the concerned timer goes off.
|
|
// mergeWalkPool's purpose is to maintain active mergeWalk go-routines in a map so that
|
|
// it can be looked up across related list calls.
|
|
type MergeWalkPool struct {
|
|
sync.Mutex
|
|
pool map[listParams][]mergeWalk
|
|
timeOut time.Duration
|
|
}
|
|
|
|
// NewMergeWalkPool - initialize new tree walk pool.
|
|
func NewMergeWalkPool(timeout time.Duration) *MergeWalkPool {
|
|
tPool := &MergeWalkPool{
|
|
pool: make(map[listParams][]mergeWalk),
|
|
timeOut: timeout,
|
|
}
|
|
return tPool
|
|
}
|
|
|
|
// Release - selects a mergeWalk from the pool based on the input
|
|
// listParams, removes it from the pool, and returns the MergeWalkResult
|
|
// channel.
|
|
// Returns nil if listParams does not have an associated mergeWalk.
|
|
func (t *MergeWalkPool) Release(params listParams) ([]FileInfoCh, chan struct{}) {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
walks, ok := t.pool[params] // Pick the valid walks.
|
|
if !ok || len(walks) == 0 {
|
|
// Release return nil if params not found.
|
|
return nil, nil
|
|
}
|
|
|
|
// Pop out the first valid walk entry.
|
|
walk := walks[0]
|
|
walks[0] = mergeWalk{} // clear references.
|
|
walks = walks[1:]
|
|
if len(walks) > 0 {
|
|
t.pool[params] = walks
|
|
} else {
|
|
delete(t.pool, params)
|
|
}
|
|
walk.endTimerCh <- struct{}{}
|
|
return walk.entryChs, walk.endWalkCh
|
|
}
|
|
|
|
// Set - adds a mergeWalk to the mergeWalkPool.
|
|
// Also starts a timer go-routine that ends when:
|
|
// 1) time.After() expires after t.timeOut seconds.
|
|
// The expiration is needed so that the mergeWalk go-routine resources are freed after a timeout
|
|
// if the S3 client does only partial listing of objects.
|
|
// 2) Release() signals the timer go-routine to end on endTimerCh.
|
|
// During listing the timer should not timeout and end the mergeWalk go-routine, hence the
|
|
// timer go-routine should be ended.
|
|
func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh chan struct{}) {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
|
|
// If we are above the limit delete at least one entry from the pool.
|
|
if len(t.pool) > treeWalkEntryLimit {
|
|
age := time.Now()
|
|
var oldest listParams
|
|
for k, v := range t.pool {
|
|
if len(v) == 0 {
|
|
delete(t.pool, k)
|
|
continue
|
|
}
|
|
// The first element is the oldest, so we only check that.
|
|
e := v[0]
|
|
if e.added.Before(age) {
|
|
oldest = k
|
|
age = e.added
|
|
}
|
|
}
|
|
// Invalidate and delete oldest.
|
|
if walks, ok := t.pool[oldest]; ok && len(walks) > 0 {
|
|
endCh := walks[0].endTimerCh
|
|
endWalkCh := walks[0].endWalkCh
|
|
if len(walks) > 1 {
|
|
// Move walks forward
|
|
copy(walks, walks[1:])
|
|
walks = walks[:len(walks)-1]
|
|
t.pool[oldest] = walks
|
|
} else {
|
|
// Only entry, just delete.
|
|
delete(t.pool, oldest)
|
|
}
|
|
select {
|
|
case endCh <- struct{}{}:
|
|
close(endWalkCh)
|
|
default:
|
|
}
|
|
} else {
|
|
// Shouldn't happen, but just in case.
|
|
delete(t.pool, oldest)
|
|
}
|
|
}
|
|
|
|
// Should be a buffered channel so that Release() never blocks.
|
|
endTimerCh := make(chan struct{}, 1)
|
|
walkInfo := mergeWalk{
|
|
added: UTCNow(),
|
|
entryChs: resultChs,
|
|
endWalkCh: endWalkCh,
|
|
endTimerCh: endTimerCh,
|
|
}
|
|
|
|
// Append new walk info.
|
|
walks := t.pool[params]
|
|
if len(walks) < treeWalkSameEntryLimit {
|
|
t.pool[params] = append(walks, walkInfo)
|
|
} else {
|
|
// We are at limit, invalidate oldest, move list down and add new as last.
|
|
select {
|
|
case walks[0].endTimerCh <- struct{}{}:
|
|
close(walks[0].endWalkCh)
|
|
default:
|
|
}
|
|
copy(walks, walks[1:])
|
|
walks[len(walks)-1] = walkInfo
|
|
}
|
|
|
|
// Timer go-routine which times out after t.timeOut seconds.
|
|
go func(endTimerCh <-chan struct{}, walkInfo mergeWalk) {
|
|
select {
|
|
// Wait until timeOut
|
|
case <-time.After(t.timeOut):
|
|
// Timeout has expired. Remove the mergeWalk from mergeWalkPool and
|
|
// end the mergeWalk go-routine.
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
walks, ok := t.pool[params]
|
|
if ok {
|
|
// Trick of filtering without allocating
|
|
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
|
|
nwalks := walks[:0]
|
|
// Look for walkInfo, remove it from the walks list.
|
|
for _, walk := range walks {
|
|
if !reflect.DeepEqual(walk, walkInfo) {
|
|
nwalks = append(nwalks, walk)
|
|
}
|
|
}
|
|
if len(nwalks) == 0 {
|
|
// No more mergeWalk go-routines associated with listParams
|
|
// hence remove map entry.
|
|
delete(t.pool, params)
|
|
} else {
|
|
// There are more mergeWalk go-routines associated with listParams
|
|
// hence save the list in the map.
|
|
t.pool[params] = nwalks
|
|
}
|
|
}
|
|
// Signal the mergeWalk go-routine to die.
|
|
close(endWalkCh)
|
|
case <-endTimerCh:
|
|
return
|
|
}
|
|
}(endTimerCh, walkInfo)
|
|
}
|