Skip to content

Commit

Permalink
bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
caffix committed Oct 19, 2024
1 parent 5a9dce0 commit 26292e1
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func (q *queue) append(data interface{}, priority QueuePriority) {

// Signal implements the Queue interface.
func (q *queue) Signal() <-chan struct{} {
q.Lock()
defer q.Unlock()

q.prepSignal()
return q.signal
}
Expand All @@ -101,7 +104,7 @@ func (q *queue) prepSignal() {
default:
}

if !send && q.Len() > 0 {
if !send && q.lenWithoutLock() > 0 {
send = true
}
if send {
Expand All @@ -113,12 +116,11 @@ func (q *queue) prepSignal() {
}

func (q *queue) drain() {
loop:
for {
select {
case <-q.signal:
default:
break loop
return
}
}
}
Expand Down Expand Up @@ -146,6 +148,7 @@ func (q *queue) Next() (interface{}, bool) {
return nil, false
}

q.prepSignal()
return data, true
}

Expand All @@ -169,10 +172,13 @@ func (q *queue) Len() int {
q.Lock()
defer q.Unlock()

return q.lenWithoutLock()
}

func (q *queue) lenWithoutLock() int {
qlen := len(q.low)
qlen += len(q.norm)
qlen += len(q.high)
qlen += len(q.crit)

return qlen
}

0 comments on commit 26292e1

Please sign in to comment.