Skip to content

Commit 5e31fa7

Browse files
committed
#15 Add support for priority prioritySelect() for channels.
1 parent 0fe0a2a commit 5e31fa7

File tree

2 files changed

+144
-45
lines changed

2 files changed

+144
-45
lines changed

src/main/scala/com/github/yruslan/channel/Channel.scala

Lines changed: 82 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ abstract class Channel[T] extends ReadChannel[T] with WriteChannel[T] {
101101

102102
protected def fetchValueOpt(): Option[T]
103103

104-
final override def sender(value: T) (action: => Unit = {}): Selector = {
104+
final override def sender(value: T)(action: => Unit = {}): Selector = {
105105
new Selector(true, this) {
106106
override def sendRecv(): Boolean = trySend(value)
107107

@@ -250,23 +250,23 @@ object Channel {
250250
val CLOSED = 2
251251

252252
/**
253-
* Create a synchronous channel.
254-
*
255-
* @tparam T The type of the channel.
256-
* @return A new channel
257-
*/
253+
* Create a synchronous channel.
254+
*
255+
* @tparam T The type of the channel.
256+
* @return A new channel
257+
*/
258258
def make[T]: Channel[T] = {
259259
new SyncChannel[T]
260260
}
261261

262262
/**
263-
* Create a channel. By default a synchronous channel will be created.
264-
* If bufferSize is greater then zero, a buffered channel will be created.
265-
*
266-
* @param bufferSize Asynchronous buffer size.
267-
* @tparam T The type of the channel.
268-
* @return A new channel
269-
*/
263+
* Create a channel. By default a synchronous channel will be created.
264+
* If bufferSize is greater then zero, a buffered channel will be created.
265+
*
266+
* @param bufferSize Asynchronous buffer size.
267+
* @tparam T The type of the channel.
268+
* @return A new channel
269+
*/
270270
def make[T](bufferSize: Int): Channel[T] = {
271271
require(bufferSize >= 0)
272272

@@ -288,41 +288,86 @@ object Channel {
288288
}
289289

290290
/**
291-
* Waits for a non-blocking operation to be available on the list of channels.
292-
*
293-
* @param selector A first channel to wait for (mandatory).
294-
* @param selectors Other channels to wait for.
295-
* @return true is none of the channels are closed and select() can be invoked again, false if at least one of channels is closed.
296-
*/
291+
* Waits for a non-blocking operation to be available on the list of channels.
292+
* If more than one channel is ready to perform its operation, the channel to perform the operation on will be chosen
293+
* at random.
294+
*
295+
* @param selector A first channel to wait for (mandatory).
296+
* @param selectors Other channels to wait for.
297+
* @return true is none of the channels are closed and select() can be invoked again, false if at least one of channels is closed.
298+
*/
297299
def select(selector: Selector, selectors: Selector*): Boolean = {
298-
trySelect(Duration.Inf, selector, selectors: _*)
300+
trySelect(Duration.Inf, false, selector, selectors: _*)
301+
}
302+
303+
/**
304+
* Waits for a non-blocking operation to be available on the list of channels.
305+
* If more than one channel is ready to perform its operation, the first one in the list takes precedence.
306+
*
307+
* @param selector A first channel to wait for (mandatory).
308+
* @param selectors Other channels to wait for.
309+
* @return true is none of the channels are closed and select() can be invoked again, false if at least one of channels is closed.
310+
*/
311+
def prioritySelect(selector: Selector, selectors: Selector*): Boolean = {
312+
trySelect(Duration.Inf, true, selector, selectors: _*)
299313
}
300314

301315
/**
302-
* Non-blocking check for a possibility of a non-blocking operation on several channels.
303-
*
304-
* @param selector A first channel to wait for (mandatory).
305-
* @param selectors Other channels to wait for.
306-
* @return true if one of pending operations wasn't blocking.
307-
*/
316+
* Non-blocking check for a possibility of a non-blocking operation on several channels.
317+
* If more than one channel is ready to perform its operation, the channel to perform the operation on will be chosen
318+
* at random.
319+
*
320+
* @param selector A first channel to wait for (mandatory).
321+
* @param selectors Other channels to wait for.
322+
* @return true if one of pending operations wasn't blocking.
323+
*/
308324
def trySelect(selector: Selector, selectors: Selector*): Boolean = {
309-
trySelect(Duration.Zero, selector, selectors: _*)
325+
trySelect(Duration.Zero, false, selector, selectors: _*)
310326
}
311327

312328
/**
313-
* Waits for a non-bloaking action to be available.
314-
*
315-
* @param timout A timeout to wait for a non-blocking action to be available.
316-
* @param selector A first channel to wait for (mandatory).
317-
* @param selectors Other channels to wait for.
318-
* @return true if one of pending operations wasn't blockingю
319-
*/
320-
@throws[InterruptedException]
329+
* Non-blocking check for a possibility of a non-blocking operation on several channels.
330+
*
331+
* @param selector A first channel to wait for (mandatory).
332+
* @param selectors Other channels to wait for.
333+
* @return true if one of pending operations wasn't blocking.
334+
*/
321335
def trySelect(timout: Duration, selector: Selector, selectors: Selector*): Boolean = {
336+
trySelect(timout, false, selector, selectors: _*)
337+
}
338+
339+
/**
340+
* Non-blocking check for a possibility of a non-blocking operation on several channels.
341+
* If more than one channel is ready to perform its operation, the first one in the list takes precedence.
342+
*
343+
* @param selector A first channel to wait for (mandatory).
344+
* @param selectors Other channels to wait for.
345+
* @return true if one of pending operations wasn't blocking.
346+
*/
347+
def tryPrioritySelect(selector: Selector, selectors: Selector*): Boolean = {
348+
trySelect(Duration.Zero, true, selector, selectors: _*)
349+
}
350+
351+
/**
352+
* Waits for a non-bloaking action to be available.
353+
*
354+
* @param timout A timeout to wait for a non-blocking action to be available.
355+
* @param isPriorityOrdered If true, when more then one selectors is ready, the first one in the list will be selected.
356+
* @param selector A first channel to wait for (mandatory).
357+
* @param selectors Other channels to wait for.
358+
* @return true if one of pending operations wasn't blockingю
359+
*/
360+
@throws[InterruptedException]
361+
def trySelect(timout: Duration, isPriorityOrdered: Boolean, selector: Selector, selectors: Selector*): Boolean = {
322362
val sem = new Semaphore(0)
323363

324-
// If several channels have pending messages, select randomly the channel to return
325-
val sel = scala.util.Random.shuffle(selector :: selectors.toList).toArray
364+
val sel = if (isPriorityOrdered) {
365+
// If channels are ordered by priority, retain the original order
366+
(selector :: selectors.toList).toArray
367+
} else {
368+
// If several channels have pending messages, select randomly the channel to return
369+
scala.util.Random.shuffle(selector :: selectors.toList).toArray
370+
}
326371

327372
// Add waiters
328373
var i = 0

src/test/scala/com/github/yruslan/channel/GuaranteesSuite.scala

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ package com.github.yruslan.channel
1717

1818
import java.time.Instant
1919
import java.util.concurrent.{Executors, TimeUnit}
20-
2120
import org.scalatest.wordspec.AnyWordSpec
22-
import com.github.yruslan.channel.Channel.select
21+
import com.github.yruslan.channel.Channel.{select, trySelect}
2322

2423
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
2524
import scala.concurrent._
@@ -118,11 +117,55 @@ class GuaranteesSuite extends AnyWordSpec {
118117
}
119118
}
120119

120+
"Priority is honored for priority selects" when {
121+
"several sync input sync output channels are active, a channel is selected according to priority" in {
122+
val in1 = Channel.make[Int]
123+
val in2 = Channel.make[Int]
124+
125+
val out1 = Channel.make[Int]
126+
val out2 = Channel.make[Int]
127+
128+
testFairness(in1, in2, out1, out2, isPriority = true)
129+
}
130+
131+
"several sync input async output channels are active, a channel is selected according to priority" in {
132+
val in1 = Channel.make[Int]
133+
val in2 = Channel.make[Int]
134+
135+
val out1 = Channel.make[Int](1)
136+
val out2 = Channel.make[Int](1)
137+
138+
testFairness(in1, in2, out1, out2, isPriority = true)
139+
}
140+
141+
"several async input sync output channels are active, a channel is selected according to priority" in {
142+
val in1 = Channel.make[Int](1)
143+
val in2 = Channel.make[Int](1)
144+
145+
val out1 = Channel.make[Int]
146+
val out2 = Channel.make[Int]
147+
148+
testFairness(in1, in2, out1, out2, isPriority = true)
149+
}
150+
151+
"several async input async output channels are active, a channel is selected according to priority" in {
152+
val in1 = Channel.make[Int](1)
153+
val in2 = Channel.make[Int](1)
154+
155+
val out1 = Channel.make[Int](1)
156+
val out2 = Channel.make[Int](1)
157+
158+
testFairness(in1, in2, out1, out2, isPriority = true)
159+
}
160+
}
161+
162+
121163
/* Full qualified name 'com.github.yruslan.channel.Channel' is used here to make IntelliJ IDEA happy. */
122164
private def testFairness(in1: com.github.yruslan.channel.Channel[Int],
123165
in2: com.github.yruslan.channel.Channel[Int],
124166
out1: com.github.yruslan.channel.Channel[Int],
125-
out2: com.github.yruslan.channel.Channel[Int]): Unit = {
167+
out2: com.github.yruslan.channel.Channel[Int],
168+
isPriority: Boolean = false): Unit = {
126169
val results = new ListBuffer[(Int, Int)]
127170

128171
def balancer(input1: ReadChannel[Int],
@@ -134,14 +177,18 @@ class GuaranteesSuite extends AnyWordSpec {
134177
var exit = false
135178

136179
while (!exit) {
137-
select(
180+
trySelect(
181+
Duration.Inf,
182+
isPriority,
138183
input1.recver(x => v = x),
139184
input2.recver(x => v = x),
140185
finishChannel.recver(_ => exit = true)
141186
)
142187

143188
if (!exit) {
144-
select(
189+
trySelect(
190+
Duration.Inf,
191+
isPriority,
145192
output1.sender(v) {},
146193
output2.sender(v) {}
147194
)
@@ -196,10 +243,17 @@ class GuaranteesSuite extends AnyWordSpec {
196243
assert(results.size == 100)
197244
assert(results.map(_._2).sum == 10100) // sum(1..100)*2 = 101*50*2 = 5050*2 = 10100
198245

199-
// Fairness
200246
val processedBy = Range(0, 4).map(w => results.count(_._1 == w))
201-
assert(processedBy.min > 15)
202-
assert(processedBy.max < 35)
247+
248+
if (isPriority) {
249+
// Priority
250+
assert(processedBy.min < 5)
251+
assert(processedBy.max > 45)
252+
} else {
253+
// Fairness
254+
assert(processedBy.min > 15)
255+
assert(processedBy.max < 35)
256+
}
203257
}
204258

205259
}

0 commit comments

Comments
 (0)