Release v0.3.0

This commit is contained in:
Manu Herrera
2020-11-09 10:05:29 -03:00
parent 4e9aa7a3c5
commit 8107c4478b
1265 changed files with 440488 additions and 107809 deletions

View File

@@ -3,7 +3,6 @@ package queue
import (
"container/list"
"sync"
"sync/atomic"
)
// ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity.
@@ -12,8 +11,8 @@ import (
// items from the in channel to the out channel in the correct order that must
// be started by calling Start().
type ConcurrentQueue struct {
started uint32 // to be used atomically
stopped uint32 // to be used atomically
started sync.Once
stopped sync.Once
chanIn chan interface{}
chanOut chan interface{}
@@ -51,14 +50,15 @@ func (cq *ConcurrentQueue) ChanOut() <-chan interface{} {
// minimize overhead, but if the out channel is full it pushes items to an
// overflow queue. This must be called before using the queue.
func (cq *ConcurrentQueue) Start() {
if !atomic.CompareAndSwapUint32(&cq.started, 0, 1) {
return
}
cq.started.Do(cq.start)
}
func (cq *ConcurrentQueue) start() {
cq.wg.Add(1)
go func() {
defer cq.wg.Done()
readLoop:
for {
nextElement := cq.overflow.Front()
if nextElement == nil {
@@ -66,7 +66,10 @@ func (cq *ConcurrentQueue) Start() {
// directly to the output channel. If output channel is full
// though, push to overflow.
select {
case item := <-cq.chanIn:
case item, ok := <-cq.chanIn:
if !ok {
break readLoop
}
select {
case cq.chanOut <- item:
// Optimistically push directly to chanOut
@@ -80,7 +83,10 @@ func (cq *ConcurrentQueue) Start() {
// Overflow queue is not empty, so any new items get pushed to
// the back to preserve order.
select {
case item := <-cq.chanIn:
case item, ok := <-cq.chanIn:
if !ok {
break readLoop
}
cq.overflow.PushBack(item)
case cq.chanOut <- nextElement.Value:
cq.overflow.Remove(nextElement)
@@ -89,6 +95,22 @@ func (cq *ConcurrentQueue) Start() {
}
}
}
// Incoming channel has been closed. Empty overflow queue into
// the outgoing channel.
nextElement := cq.overflow.Front()
for nextElement != nil {
select {
case cq.chanOut <- nextElement.Value:
cq.overflow.Remove(nextElement)
case <-cq.quit:
return
}
nextElement = cq.overflow.Front()
}
// Close outgoing channel.
close(cq.chanOut)
}()
}
@@ -96,10 +118,8 @@ func (cq *ConcurrentQueue) Start() {
// channel. This does not clear the queue state, so the queue can be restarted
// without dropping items.
func (cq *ConcurrentQueue) Stop() {
if !atomic.CompareAndSwapUint32(&cq.stopped, 0, 1) {
return
}
close(cq.quit)
cq.wg.Wait()
cq.stopped.Do(func() {
close(cq.quit)
cq.wg.Wait()
})
}