mirror of
https://github.com/minio/minio.git
synced 2025-01-15 00:35:02 -05:00
165 lines
4.2 KiB
Go
165 lines
4.2 KiB
Go
|
/*
|
||
|
* Minio Cloud Storage, (C) 2015 Minio, Inc.
|
||
|
*
|
||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
* you may not use this file except in compliance with the License.
|
||
|
* You may obtain a copy of the License at
|
||
|
*
|
||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||
|
*
|
||
|
* Unless required by applicable law or agreed to in writing, software
|
||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
* See the License for the specific language governing permissions and
|
||
|
* limitations under the License.
|
||
|
*/
|
||
|
|
||
|
package tasker
|
||
|
|
||
|
import (
|
||
|
"container/list"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
// TaskCtl (Task Controller) is a framework to create and manage
|
||
|
// tasks.
|
||
|
type TaskCtl struct {
|
||
|
mutex *sync.Mutex // Lock
|
||
|
// List of tasks managed by this task controller.
|
||
|
tasks *list.List
|
||
|
}
|
||
|
|
||
|
// New creates a new TaskCtl to create and control a collection of tasks.
|
||
|
// Single application can create multiple task controllers to manage different set of tasks separately.
|
||
|
func New(name string) *TaskCtl {
|
||
|
return &TaskCtl{
|
||
|
mutex: &sync.Mutex{},
|
||
|
tasks: list.New(),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// NewTask creates a new task structure and returns a handle to it. Only the task controller
|
||
|
// has access to the task structure. The caller routine only receives a handle to its task structure.
|
||
|
// Task handle is like a reference to task self. Caller is expected to listen for commands from
|
||
|
// the task controller and comply with it co-operatively.
|
||
|
func (tc *TaskCtl) NewTask(name string) Handle {
|
||
|
tc.mutex.Lock()
|
||
|
defer tc.mutex.Unlock()
|
||
|
|
||
|
// Create a new task.
|
||
|
tsk := newTask(name)
|
||
|
|
||
|
// Register this task in the TaskCtl's tasklist and save the reference.
|
||
|
tsk.this = tc.tasks.PushBack(tsk)
|
||
|
|
||
|
// Free task from the tasklist upon close call.
|
||
|
go func() {
|
||
|
// Release the tasks resources upon return of this function.
|
||
|
defer tsk.close()
|
||
|
|
||
|
// Will be notified here upon task's end of life.
|
||
|
this := <-tsk.closeCh
|
||
|
|
||
|
tc.mutex.Lock()
|
||
|
defer tc.mutex.Unlock()
|
||
|
|
||
|
// Release the task structure from the task list.
|
||
|
tc.tasks.Remove(this)
|
||
|
}()
|
||
|
|
||
|
// Return a handle to this task.
|
||
|
return tsk.getHandle()
|
||
|
}
|
||
|
|
||
|
// Shutdown ends all tasks, including the suspended ones.
|
||
|
func (tc *TaskCtl) Shutdown() {
|
||
|
tc.mutex.Lock()
|
||
|
defer tc.mutex.Unlock()
|
||
|
|
||
|
var wg sync.WaitGroup
|
||
|
|
||
|
// End all tasks.
|
||
|
for e := tc.tasks.Front(); e != nil; e = e.Next() {
|
||
|
wg.Add(1)
|
||
|
thisTask := e.Value.(task) // Make a local copy for go routine.
|
||
|
// End tasks in background. Flow of events from here is as follows: thisTask.handle.Close() -> tc.NewTask() -> this.task.close().
|
||
|
go func() {
|
||
|
thisTask.getHandle().Close()
|
||
|
wg.Done()
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
wg.Wait() // Wait for all tasks to end gracefully.
|
||
|
|
||
|
// Reset the task pool.
|
||
|
tc.tasks = nil
|
||
|
}
|
||
|
|
||
|
// Suspend puts all tasks to sleep.
|
||
|
func (tc *TaskCtl) Suspend() bool {
|
||
|
tc.mutex.Lock()
|
||
|
defer tc.mutex.Unlock()
|
||
|
|
||
|
var wg sync.WaitGroup
|
||
|
|
||
|
// If any one of the task fails to suspend, this flag will set to false.
|
||
|
statusAll := make([]status, tc.tasks.Len())
|
||
|
|
||
|
// Suspend all tasks.
|
||
|
i := 0
|
||
|
for e := tc.tasks.Front(); e != nil; e = e.Next() {
|
||
|
wg.Add(1)
|
||
|
locTask := e.Value.(task) // Make a local copy for go routine.
|
||
|
locI := i // local i
|
||
|
// Suspend a task in background.
|
||
|
go func(locI int) {
|
||
|
defer wg.Done()
|
||
|
statusAll[locI] = locTask.command(CmdSignalSuspend)
|
||
|
}(locI)
|
||
|
i++
|
||
|
}
|
||
|
|
||
|
wg.Wait() // Wait for all tasks to suspend gracefully.
|
||
|
|
||
|
for _, st := range statusAll {
|
||
|
if st.code != statusDone {
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
// Resume wakes up all suspended task from sleep.
|
||
|
func (tc *TaskCtl) Resume() bool {
|
||
|
tc.mutex.Lock()
|
||
|
defer tc.mutex.Unlock()
|
||
|
|
||
|
var wg sync.WaitGroup
|
||
|
|
||
|
// If any one of the task fails to suspend, this flag will set to false.
|
||
|
statusAll := make([]status, tc.tasks.Len())
|
||
|
|
||
|
i := 0
|
||
|
// Resume all suspended tasks.
|
||
|
for e := tc.tasks.Front(); e != nil; e = e.Next() {
|
||
|
wg.Add(1)
|
||
|
locTask := e.Value.(task) // Make a local copy for go routine.
|
||
|
locI := i // local i
|
||
|
// Resume a task in background.
|
||
|
go func(locI int) {
|
||
|
defer wg.Done()
|
||
|
statusAll[locI] = locTask.command(CmdSignalResume)
|
||
|
}(locI)
|
||
|
i++
|
||
|
}
|
||
|
wg.Wait() // Wait for all tasks to resume.
|
||
|
|
||
|
for _, st := range statusAll {
|
||
|
if st.code != statusDone {
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
return true
|
||
|
|
||
|
}
|