diff --git a/cmd/merge-walk-pool.go b/cmd/merge-walk-pool.go index 309c1bca9..a68dc2320 100644 --- a/cmd/merge-walk-pool.go +++ b/cmd/merge-walk-pool.go @@ -35,6 +35,7 @@ type mergeWalkVersions struct { // mergeWalk - represents the go routine that does the merge walk. type mergeWalk struct { + added time.Time entryChs []FileInfoCh endWalkCh chan struct{} // To signal when mergeWalk go-routine should end. endTimerCh chan<- struct{} // To signal when timer go-routine should end. @@ -160,7 +161,7 @@ func NewMergeWalkPool(timeout time.Duration) *MergeWalkPool { // Release - selects a mergeWalk from the pool based on the input // listParams, removes it from the pool, and returns the MergeWalkResult // channel. -// Returns nil if listParams does not have an asccociated mergeWalk. +// Returns nil if listParams does not have an associated mergeWalk. func (t *MergeWalkPool) Release(params listParams) ([]FileInfoCh, chan struct{}) { t.Lock() defer t.Unlock() @@ -169,6 +170,7 @@ func (t *MergeWalkPool) Release(params listParams) ([]FileInfoCh, chan struct{}) if len(walks) > 0 { // Pop out the first valid walk entry. walk := walks[0] + walks[0] = mergeWalk{} // clear references. walks = walks[1:] if len(walks) > 0 { t.pool[params] = walks @@ -195,17 +197,54 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh t.Lock() defer t.Unlock() + // If we are above the limit delete at least one entry from the pool. + if len(t.pool) > treeWalkEntryLimit { + age := time.Now() + var oldest listParams + for k, v := range t.pool { + if len(v) == 0 { + delete(t.pool, k) + continue + } + // The first element is the oldest, so we only check that. + if v[0].added.Before(age) { + oldest = k + } + } + // Invalidate and delete oldest. + if walks, ok := t.pool[oldest]; ok { + walk := walks[0] + walks[0] = mergeWalk{} // clear references. + walks = walks[1:] + if len(walks) > 0 { + t.pool[params] = walks + } else { + delete(t.pool, params) + } + walk.endTimerCh <- struct{}{} + } + } + // Should be a buffered channel so that Release() never blocks. endTimerCh := make(chan struct{}, 1) walkInfo := mergeWalk{ + added: UTCNow(), entryChs: resultChs, endWalkCh: endWalkCh, endTimerCh: endTimerCh, } // Append new walk info. - t.pool[params] = append(t.pool[params], walkInfo) + walks := t.pool[params] + if len(walks) < treeWalkSameEntryLimit { + t.pool[params] = append(walks, walkInfo) + } else { + // We are at limit, invalidate oldest, move list down and add new as last. + walks[0].endTimerCh <- struct{}{} + copy(walks, walks[1:]) + walks[len(walks)-1] = walkInfo + } // Timer go-routine which times out after t.timeOut seconds. go func(endTimerCh <-chan struct{}, walkInfo mergeWalk) { diff --git a/cmd/merge-walk-pool_test.go b/cmd/merge-walk-pool_test.go index 6d235a442..ee127162c 100644 --- a/cmd/merge-walk-pool_test.go +++ b/cmd/merge-walk-pool_test.go @@ -74,7 +74,7 @@ func TestManyMergeWalksSameParam(t *testing.T) { break default: // Create many treeWalk go-routines for the same params. - for i := 0; i < 10; i++ { + for i := 0; i < treeWalkSameEntryLimit; i++ { endWalkCh := make(chan struct{}) walkChs := make([]FileInfoCh, 0) tw.Set(params, walkChs, endWalkCh) @@ -82,16 +82,62 @@ func TestManyMergeWalksSameParam(t *testing.T) { tw.Lock() if walks, ok := tw.pool[params]; ok { - if len(walks) != 10 { + if len(walks) != treeWalkSameEntryLimit { t.Error("There aren't as many walks as were Set") } } tw.Unlock() - for i := 0; i < 10; i++ { + for i := 0; i < treeWalkSameEntryLimit; i++ { tw.Lock() if walks, ok := tw.pool[params]; ok { - // Before ith Release we should have 10-i treeWalk go-routines. - if 10-i != len(walks) { + // Before ith Release we should have n-i treeWalk go-routines. + if treeWalkSameEntryLimit-i != len(walks) { + t.Error("There aren't as many walks as were Set") + } + } + tw.Unlock() + tw.Release(params) + } + } + +} + +// Test if multiple merge walkers for the same listParams are managed as expected by the pool +// but that treeWalkSameEntryLimit is respected. +func TestManyMergeWalksSameParamPrune(t *testing.T) { + // Create a treeWalkPool. + tw := NewMergeWalkPool(5 * time.Second) + + // Create sample params. + params := listParams{ + bucket: "test-bucket", + } + + select { + // This timeout is an upper-bound. This is started + // before the first treeWalk go-routine's timeout period starts. + case <-time.After(5 * time.Second): + break + default: + // Create many treeWalk go-routines for the same params. + for i := 0; i < treeWalkSameEntryLimit*4; i++ { + endWalkCh := make(chan struct{}) + walkChs := make([]FileInfoCh, 0) + tw.Set(params, walkChs, endWalkCh) + } + + tw.Lock() + if walks, ok := tw.pool[params]; ok { + if len(walks) > treeWalkSameEntryLimit { + t.Error("There aren't as many walks as were Set") + } + } + tw.Unlock() + for i := 0; i < treeWalkSameEntryLimit; i++ { + tw.Lock() + if walks, ok := tw.pool[params]; ok { + // Before ith Release we should have n-i treeWalk go-routines. + if treeWalkSameEntryLimit-i != len(walks) { t.Error("There aren't as many walks as were Set") } } diff --git a/cmd/tree-walk-pool.go b/cmd/tree-walk-pool.go index 9eeae6554..54e568304 100644 --- a/cmd/tree-walk-pool.go +++ b/cmd/tree-walk-pool.go @@ -25,7 +25,9 @@ import ( // Global lookup timeout. const ( - globalLookupTimeout = time.Minute * 30 // 30minutes. + globalLookupTimeout = time.Minute * 30 // 30minutes. + treeWalkEntryLimit = 50 + treeWalkSameEntryLimit = 4 ) // listParams - list object params used for list object map @@ -44,6 +46,7 @@ var errWalkAbort = errors.New("treeWalk abort") // treeWalk - represents the go routine that does the file tree walk. type treeWalk struct { + added time.Time resultCh chan TreeWalkResult endWalkCh chan struct{} // To signal when treeWalk go-routine should end. endTimerCh chan<- struct{} // To signal when timer go-routine should end. @@ -72,7 +75,7 @@ func NewTreeWalkPool(timeout time.Duration) *TreeWalkPool { // Release - selects a treeWalk from the pool based on the input // listParams, removes it from the pool, and returns the TreeWalkResult // channel. -// Returns nil if listParams does not have an asccociated treeWalk. +// Returns nil if listParams does not have an associated treeWalk. func (t *TreeWalkPool) Release(params listParams) (resultCh chan TreeWalkResult, endWalkCh chan struct{}) { t.Lock() defer t.Unlock() @@ -81,6 +84,7 @@ func (t *TreeWalkPool) Release(params listParams) (resultCh chan TreeWalkResult, if len(walks) > 0 { // Pop out the first valid walk entry. walk := walks[0] + walks[0] = treeWalk{} // clear references. walks = walks[1:] if len(walks) > 0 { t.pool[params] = walks @@ -100,22 +104,59 @@ func (t *TreeWalkPool) Release(params listParams) (resultCh chan TreeWalkResult, // 1) time.After() expires after t.timeOut seconds. // The expiration is needed so that the treeWalk go-routine resources are freed after a timeout // if the S3 client does only partial listing of objects. -// 2) Relase() signals the timer go-routine to end on endTimerCh. +// 2) Release() signals the timer go-routine to end on endTimerCh. // During listing the timer should not timeout and end the treeWalk go-routine, hence the // timer go-routine should be ended. func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endWalkCh chan struct{}) { t.Lock() defer t.Unlock() + // If we are above the limit delete at least one entry from the pool. + if len(t.pool) > treeWalkEntryLimit { + age := time.Now() + var oldest listParams + for k, v := range t.pool { + if len(v) == 0 { + delete(t.pool, k) + continue + } + // The first element is the oldest, so we only check that. + if v[0].added.Before(age) { + oldest = k + } + } + // Invalidate and delete oldest. + if walks, ok := t.pool[oldest]; ok { + walk := walks[0] + walks[0] = treeWalk{} // clear references. + walks = walks[1:] + if len(walks) > 0 { + t.pool[params] = walks + } else { + delete(t.pool, params) + } + walk.endTimerCh <- struct{}{} + } + } + // Should be a buffered channel so that Release() never blocks. endTimerCh := make(chan struct{}, 1) walkInfo := treeWalk{ + added: UTCNow(), resultCh: resultCh, endWalkCh: endWalkCh, endTimerCh: endTimerCh, } // Append new walk info. - t.pool[params] = append(t.pool[params], walkInfo) + walks := t.pool[params] + if len(walks) < treeWalkSameEntryLimit { + t.pool[params] = append(walks, walkInfo) + } else { + // We are at limit, invalidate oldest, move list down and add new as last. + walks[0].endTimerCh <- struct{}{} + copy(walks, walks[1:]) + walks[len(walks)-1] = walkInfo + } // Timer go-routine which times out after t.timeOut seconds. go func(endTimerCh <-chan struct{}) { diff --git a/cmd/tree-walk-pool_test.go b/cmd/tree-walk-pool_test.go index e10970d97..313383d1a 100644 --- a/cmd/tree-walk-pool_test.go +++ b/cmd/tree-walk-pool_test.go @@ -74,7 +74,7 @@ func TestManyWalksSameParam(t *testing.T) { break default: // Create many treeWalk go-routines for the same params. - for i := 0; i < 10; i++ { + for i := 0; i < treeWalkSameEntryLimit; i++ { resultCh := make(chan TreeWalkResult) endWalkCh := make(chan struct{}) tw.Set(params, resultCh, endWalkCh) @@ -82,16 +82,61 @@ func TestManyWalksSameParam(t *testing.T) { tw.Lock() if walks, ok := tw.pool[params]; ok { - if len(walks) != 10 { + if len(walks) != treeWalkSameEntryLimit { t.Error("There aren't as many walks as were Set") } } tw.Unlock() - for i := 0; i < 10; i++ { + for i := 0; i < treeWalkSameEntryLimit; i++ { tw.Lock() if walks, ok := tw.pool[params]; ok { - // Before ith Release we should have 10-i treeWalk go-routines. - if 10-i != len(walks) { + // Before ith Release we should have n-i treeWalk go-routines. + if treeWalkSameEntryLimit-i != len(walks) { + t.Error("There aren't as many walks as were Set") + } + } + tw.Unlock() + tw.Release(params) + } + } +} + +// Test if multiple tree walkers for the same listParams are managed as expected by the pool +// but that treeWalkSameEntryLimit is respected. +func TestManyWalksSameParamPrune(t *testing.T) { + // Create a treeWalkPool. + tw := NewTreeWalkPool(5 * time.Second) + + // Create sample params. + params := listParams{ + bucket: "test-bucket", + } + + select { + // This timeout is an upper-bound. This is started + // before the first treeWalk go-routine's timeout period starts. + case <-time.After(5 * time.Second): + break + default: + // Create many treeWalk go-routines for the same params. + for i := 0; i < treeWalkSameEntryLimit*4; i++ { + resultCh := make(chan TreeWalkResult) + endWalkCh := make(chan struct{}) + tw.Set(params, resultCh, endWalkCh) + } + + tw.Lock() + if walks, ok := tw.pool[params]; ok { + if len(walks) != treeWalkSameEntryLimit { + t.Error("There aren't as many walks as were Set") + } + } + tw.Unlock() + for i := 0; i < treeWalkSameEntryLimit; i++ { + tw.Lock() + if walks, ok := tw.pool[params]; ok { + // Before ith Release we should have n-i treeWalk go-routines. + if treeWalkSameEntryLimit-i != len(walks) { t.Error("There aren't as many walks as were Set") } } @@ -99,5 +144,4 @@ func TestManyWalksSameParam(t *testing.T) { tw.Release(params) } } - }