Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 36 additions & 39 deletions common/prque/lazyqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,31 @@ import (
//
// If the upper estimate is exceeded then Update should be called for that item.
// A global Refresh function should also be called periodically.
type LazyQueue struct {
type LazyQueue[V any] struct {
clock mclock.Clock
// Items are stored in one of two internal queues ordered by estimated max
// priority until the next and the next-after-next refresh. Update and Refresh
// always places items in queue[1].
queue [2]*sstack
popQueue *sstack
queue [2]*sstack[V]
popQueue *sstack[V]
period time.Duration
maxUntil mclock.AbsTime
indexOffset int
setIndex SetIndexCallback
priority PriorityCallback
maxPriority MaxPriorityCallback
setIndex SetIndexCallback[V]
priority PriorityCallback[V]
maxPriority MaxPriorityCallback[V]
lastRefresh1, lastRefresh2 mclock.AbsTime
}

type (
PriorityCallback func(data interface{}) int64 // actual priority callback
MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback
PriorityCallback[V any] func(data V) int64 // actual priority callback
MaxPriorityCallback[V any] func(data V, until mclock.AbsTime) int64 // estimated maximum priority callback
)

// NewLazyQueue creates a new lazy queue
func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue {
q := &LazyQueue{
popQueue: newSstack(nil, false),
func NewLazyQueue[V any](setIndex SetIndexCallback[V], priority PriorityCallback[V], maxPriority MaxPriorityCallback[V], clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue[V] {
q := &LazyQueue[V]{
popQueue: newSstack[V](nil, false),
setIndex: setIndex,
priority: priority,
maxPriority: maxPriority,
Expand All @@ -71,13 +71,13 @@ func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPrior
}

// Reset clears the contents of the queue
func (q *LazyQueue) Reset() {
q.queue[0] = newSstack(q.setIndex0, false)
q.queue[1] = newSstack(q.setIndex1, false)
func (q *LazyQueue[V]) Reset() {
q.queue[0] = newSstack[V](q.setIndex0, false)
q.queue[1] = newSstack[V](q.setIndex1, false)
}

// Refresh performs queue re-evaluation if necessary
func (q *LazyQueue) Refresh() {
func (q *LazyQueue[V]) Refresh() {
now := q.clock.Now()
for time.Duration(now-q.lastRefresh2) >= q.period*2 {
q.refresh(now)
Expand All @@ -87,33 +87,33 @@ func (q *LazyQueue) Refresh() {
}

// refresh re-evaluates items in the older queue and swaps the two queues
func (q *LazyQueue) refresh(now mclock.AbsTime) {
func (q *LazyQueue[V]) refresh(now mclock.AbsTime) {
q.maxUntil = now.Add(q.period)
for q.queue[0].Len() != 0 {
q.Push(heap.Pop(q.queue[0]).(*item).value)
q.Push(heap.Pop(q.queue[0]).(*item[V]).value)
}
q.queue[0], q.queue[1] = q.queue[1], q.queue[0]
q.indexOffset = 1 - q.indexOffset
q.maxUntil = q.maxUntil.Add(q.period)
}

// Push adds an item to the queue
func (q *LazyQueue) Push(data interface{}) {
heap.Push(q.queue[1], &item{data, q.maxPriority(data, q.maxUntil)})
func (q *LazyQueue[V]) Push(data V) {
heap.Push(q.queue[1], &item[V]{data, q.maxPriority(data, q.maxUntil)})
}

// Update updates the upper priority estimate for the item with the given queue index
func (q *LazyQueue) Update(index int) {
func (q *LazyQueue[V]) Update(index int) {
q.Push(q.Remove(index))
}

// Pop removes and returns the item with the greatest actual priority
func (q *LazyQueue) Pop() (interface{}, int64) {
func (q *LazyQueue[V]) Pop() (V, int64) {
var (
resData interface{}
resData V
resPri int64
)
q.MultiPop(func(data interface{}, priority int64) bool {
q.MultiPop(func(data V, priority int64) bool {
resData = data
resPri = priority
return false
Expand All @@ -123,7 +123,7 @@ func (q *LazyQueue) Pop() (interface{}, int64) {

// peekIndex returns the index of the internal queue where the item with the
// highest estimated priority is or -1 if both are empty
func (q *LazyQueue) peekIndex() int {
func (q *LazyQueue[V]) peekIndex() int {
if q.queue[0].Len() != 0 {
if q.queue[1].Len() != 0 && q.queue[1].blocks[0][0].priority > q.queue[0].blocks[0][0].priority {
return 1
Expand All @@ -139,17 +139,17 @@ func (q *LazyQueue) peekIndex() int {
// MultiPop pops multiple items from the queue and is more efficient than calling
// Pop multiple times. Popped items are passed to the callback. MultiPop returns
// when the callback returns false or there are no more items to pop.
func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) {
func (q *LazyQueue[V]) MultiPop(callback func(data V, priority int64) bool) {
nextIndex := q.peekIndex()
for nextIndex != -1 {
data := heap.Pop(q.queue[nextIndex]).(*item).value
heap.Push(q.popQueue, &item{data, q.priority(data)})
data := heap.Pop(q.queue[nextIndex]).(*item[V]).value
heap.Push(q.popQueue, &item[V]{data, q.priority(data)})
nextIndex = q.peekIndex()
for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) {
i := heap.Pop(q.popQueue).(*item)
i := heap.Pop(q.popQueue).(*item[V])
if !callback(i.value, i.priority) {
for q.popQueue.Len() != 0 {
q.Push(heap.Pop(q.popQueue).(*item).value)
q.Push(heap.Pop(q.popQueue).(*item[V]).value)
}
return
}
Expand All @@ -159,31 +159,28 @@ func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) boo
}

// PopItem pops the item from the queue only, dropping the associated priority value.
func (q *LazyQueue) PopItem() interface{} {
func (q *LazyQueue[V]) PopItem() V {
i, _ := q.Pop()
return i
}

// Remove removes the item with the given index.
func (q *LazyQueue) Remove(index int) interface{} {
if index < 0 {
return nil
}
return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item).value
func (q *LazyQueue[V]) Remove(index int) V {
return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item[V]).value
}

// Empty checks whether the priority queue is empty.
func (q *LazyQueue) Empty() bool {
func (q *LazyQueue[V]) Empty() bool {
return q.queue[0].Len() == 0 && q.queue[1].Len() == 0
}

// Size returns the number of items in the priority queue.
func (q *LazyQueue) Size() int {
func (q *LazyQueue[V]) Size() int {
return q.queue[0].Len() + q.queue[1].Len()
}

// setIndex0 translates internal queue item index to the virtual index space of LazyQueue
func (q *LazyQueue) setIndex0(data interface{}, index int) {
func (q *LazyQueue[V]) setIndex0(data V, index int) {
if index == -1 {
q.setIndex(data, -1)
} else {
Expand All @@ -192,6 +189,6 @@ func (q *LazyQueue) setIndex0(data interface{}, index int) {
}

// setIndex1 translates internal queue item index to the virtual index space of LazyQueue
func (q *LazyQueue) setIndex1(data interface{}, index int) {
func (q *LazyQueue[V]) setIndex1(data V, index int) {
q.setIndex(data, index+index+1)
}
39 changes: 18 additions & 21 deletions common/prque/prque.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,62 +22,59 @@ import (
)

// Priority queue data structure.
type Prque struct {
cont *sstack
type Prque[V any] struct {
cont *sstack[V]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could have parameters P comparable, V any here. That way, one could use any integer type as key.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LES uses some weird "wrap around" notion for the priorities, which requires subtracting one priority from another. @zsfelfoldi said that that is a temporary hack 2 years ago :P Until that's fixed, I can't make the priority "comparable" only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be the code to make the priority generic too: #26290, but it changes LES in - a probably silently - breaking way. We need a fix / confirm frmo @zsfelfoldi to make this change.

}

// New creates a new priority queue.
func New(setIndex SetIndexCallback) *Prque {
return &Prque{newSstack(setIndex, false)}
func New[V any](setIndex SetIndexCallback[V]) *Prque[V] {
return &Prque[V]{newSstack(setIndex, false)}
}

// NewWrapAround creates a new priority queue with wrap-around priority handling.
func NewWrapAround(setIndex SetIndexCallback) *Prque {
return &Prque{newSstack(setIndex, true)}
func NewWrapAround[V any](setIndex SetIndexCallback[V]) *Prque[V] {
return &Prque[V]{newSstack(setIndex, true)}
}

// Pushes a value with a given priority into the queue, expanding if necessary.
func (p *Prque) Push(data interface{}, priority int64) {
heap.Push(p.cont, &item{data, priority})
func (p *Prque[V]) Push(data V, priority int64) {
heap.Push(p.cont, &item[V]{data, priority})
}

// Peek returns the value with the greatest priority but does not pop it off.
func (p *Prque) Peek() (interface{}, int64) {
func (p *Prque[V]) Peek() (V, int64) {
item := p.cont.blocks[0][0]
return item.value, item.priority
}

// Pops the value with the greatest priority off the stack and returns it.
// Currently no shrinking is done.
func (p *Prque) Pop() (interface{}, int64) {
item := heap.Pop(p.cont).(*item)
func (p *Prque[V]) Pop() (V, int64) {
item := heap.Pop(p.cont).(*item[V])
return item.value, item.priority
}

// Pops only the item from the queue, dropping the associated priority value.
func (p *Prque) PopItem() interface{} {
return heap.Pop(p.cont).(*item).value
func (p *Prque[V]) PopItem() V {
return heap.Pop(p.cont).(*item[V]).value
}

// Remove removes the element with the given index.
func (p *Prque) Remove(i int) interface{} {
if i < 0 {
return nil
}
return heap.Remove(p.cont, i)
func (p *Prque[V]) Remove(i int) V {
return heap.Remove(p.cont, i).(*item[V]).value
}

// Checks whether the priority queue is empty.
func (p *Prque) Empty() bool {
func (p *Prque[V]) Empty() bool {
return p.cont.Len() == 0
}

// Returns the number of element in the priority queue.
func (p *Prque) Size() int {
func (p *Prque[V]) Size() int {
return p.cont.Len()
}

// Clears the contents of the priority queue.
func (p *Prque) Reset() {
func (p *Prque[V]) Reset() {
*p = *New(p.cont.setIndex)
}
8 changes: 4 additions & 4 deletions common/prque/prque_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestPrque(t *testing.T) {
for i := 0; i < size; i++ {
data[i] = rand.Int()
}
queue := New(nil)
queue := New[int](nil)
for rep := 0; rep < 2; rep++ {
// Fill a priority queue with the above data
for i := 0; i < size; i++ {
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestReset(t *testing.T) {
for i := 0; i < size; i++ {
data[i] = rand.Int()
}
queue := New(nil)
queue := New[int](nil)
for rep := 0; rep < 2; rep++ {
// Fill a priority queue with the above data
for i := 0; i < size; i++ {
Expand Down Expand Up @@ -104,7 +104,7 @@ func BenchmarkPush(b *testing.B) {
}
// Execute the benchmark
b.ResetTimer()
queue := New(nil)
queue := New[int](nil)
for i := 0; i < len(data); i++ {
queue.Push(data[i], prio[i])
}
Expand All @@ -118,7 +118,7 @@ func BenchmarkPop(b *testing.B) {
data[i] = rand.Int()
prio[i] = rand.Int63()
}
queue := New(nil)
queue := New[int](nil)
for i := 0; i < len(data); i++ {
queue.Push(data[i], prio[i])
}
Expand Down
Loading