diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..4845af2 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,27 @@ +name: Build + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +jobs: + build-sbt: + runs-on: ubuntu-22.04 + strategy: + fail-fast: false + matrix: + scala: [ 2.11.12, 2.12.19, 2.13.13 ] + name: Scala ${{matrix.scala}} + steps: + - name: Checkout code + uses: actions/checkout@v4 + - name: Setup JDK and sbt + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 8 + cache: sbt + - name: Build and run tests + run: sbt ++${{matrix.scala}} test diff --git a/.gitignore b/.gitignore index 9c07d4a..9a23a3b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,39 @@ +# use glob syntax. +syntax: glob +*.ser *.class -*.log +*~ +*.bak +*.old +.DS_Store + +# eclipse conf file +.settings +.classpath +.manager +.scala_dependencies + +# VsCode +.project +.bloop +.metals +metals.sbt +settings.json +sbt.json + +# idea +.idea +*.iml + +# building +target +build +null +tmp* +temp* +dist +test-output +build.log + +.cache* +dependency-reduced-pom.xml diff --git a/LICENSE b/LICENSE index 5eaac56..dc8b239 100644 --- a/LICENSE +++ b/LICENSE @@ -1,21 +1,202 @@ -MIT License - -Copyright (c) 2020 Ruslan Yushchenko - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/README.md b/README.md index e783586..70a641d 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,580 @@ -# channel -Scala implementation of the concurrency primitive similar to GoLang channels. +# Channel - a port of GoLang channels to Scala +[![Build](https://github.com/yruslan/channel_scala/workflows/Build/badge.svg)](https://github.com/yruslan/channel_scala/actions) + +> Go channels provide synchronization and messaging, 'select' provides multi-way concurrent control. +> +> _Rob Pike_ - [Concurrency is not parallelism](https://www.youtube.com/watch?v=oV9rvDllKEg) +/ [another link with better slide view](https://www.youtube.com/watch?v=qmg1CF3gZQ0) + +This is one of Scala ports of Go channels. The idea of this particular port is to match as much as possible the features +provided by GoLang so channels can be used for concurrency coordination. The library uses locks, conditional variables and +semaphores as underlying concurrency primitives so the performance is not expected to match applications written in Go. + +## Link + +| Scala 2.11 | Scala 2.12 | Scala 2.13 | +|:--------------:|:-----------------:|:------------:| +| [![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.github.yruslan/channel_scala_2.11/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.github.yruslan/channel_scala_2.12) | [![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.github.yruslan/channel_scala_2.12/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.github.yruslan/channel_scala_2.12) | [![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.github.yruslan/channel_scala_2.13/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.github.yruslan/channel_scala_2.13) | + +## Motivation +Scala provides channels as a part of the standard library (https://www.scala-lang.org/api/2.13.0/scala/concurrent/Channel.html). +However, the real power of channels comes from `select()`. + +Scala promotes Actor model as a way of handling concurrency which is based on works of Carl Hewitt (1973). Channels +are based on CSP model by Tony Hoare (1978). At first glance these models are similar, but they are in fact very different. +Good deescripton on the differences are explained here: https://stackoverflow.com/a/22622880/1038282 + +CSP channels provide an extremely simple and uniform building block for designing concurrent applications. + +## Usage + +Examples for this library are inspired by the [Go By Example](https://gobyexample.com/channels) page by [Mark McGranaghan](https://markmcgranaghan.com/). + +You can try these example in Scala REPL. + +### Ping +In this example a channel and a thread are created. A message is send from the new thread and received in the main thread. +Since synchronous channels are created by default we can be sure by the time the main thread receives the message, +the child thread is already finished. + +> *Note.* Go relies on the builtin operator `<-` for sending and receiving messages. Our implementaion defines `send()` and `recv()` methods +instead to make the code look more Scala-like. Another reason is that in Scala it is not possible to define an infix `<-`. + +```scala +import com.github.yruslan.channel.Channel +import scala.concurrent.Future + +// In these examples we always import global execution context +// so that futures could use the default thread pool. +import scala.concurrent.ExecutionContext.Implicits.global + +val channel = Channel.make[String] + +Future { channel.send("ping") } + +val msg = channel.recv() + +println(msg) +``` + +Output: +``` +ping +``` + +### Channel buffering + +Channels created by default are synchronous. That means that when a thread sends a message the method won't return +until the message is received by some thread. You may want to make channels asynchronous so that `send()` method +returns immediately. To do that you need to specify the maximum number of items the channel can store in the buffer. +As long as there is still a free space in the buffer, `send()` will return immediately. + +```scala +import com.github.yruslan.channel.Channel + +val channel = Channel.make[String](2) + +channel.send("buffered") +channel.send("channel") + +println(channel.recv()) +println(channel.recv()) + +``` +Output: +``` +buffered +channel +``` + +### Channels with unbounded buffer + +Some use cases require limited but unknown buffer size in order to satisfy progress guarantees. This port of channels +library supports unbounded channels. + +```scala +import com.github.yruslan.channel.Channel + +val channel = Channel.makeUnbounded[String] + +channel.send("unbounded") +channel.send("buffered") +channel.send("channel") + +println(channel.recv()) +println(channel.recv()) +println(channel.recv()) + +``` +Output: +``` +unbounded +buffered +channel +``` + +### Channel synchronization + +Channels can be used for synchronization. Here we use a channel to wait for a job executing in another thread +to complete. + +```scala +import com.github.yruslan.channel.Channel +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global + +def worker(done: Channel[Boolean]): Unit = { + println("working...") + Thread.sleep(1000 /*1 second*/) + println("done") + + done.send(true) +} + +val done = Channel.make[Boolean] +Future { worker(done) } + +done.recv() +``` +Output: +``` +working... +done +``` + +### Directed channels + +You can define channel directions. That is channels that can either only send or only receive messages, but not both. + +When you define a method, define an argument as `ReadChannel` if you want the method to be able to only receive messages. +Define the argument as `WriteChannel` so the method can only send messages to the channel. + +```scala +import com.github.yruslan.channel._ + +def ping(pings: WriteChannel[String], msg: String): Unit = { + pings.send(msg) +} + +def pong(pings: ReadChannel[String], pongs: WriteChannel[String]): Unit = { +val msg = pings.recv() + pongs.send(msg) +} + +val pings = Channel.make[String](1) +val pongs = Channel.make[String](1) + +ping(pings, "message") +pong(pings, pongs) + +println(pongs.recv()) +``` + +Output: +``` +message +``` + +### Closing channels +Channels can be closed which prevents sending more messages to it. It can be checked by consumers to determine when +the processing has finished. + +```scala +val ch = Channel.make[Int](5) + +ch.send(1) +ch.send(2) +ch.send(3) +ch.close() + +while (!ch.isClosed) { + println(ch.recv()) +} +``` + +Output: +``` +1 +2 +3 +``` + +### Iterate over channels +You can iterate over a channel using `foreach()`. Several threads can do the same. Each thread will receive only one +copy of the sent message. Be careful, `foreach()` is blocking and will exit only when the channel is closed. If you +forget to close a channel, the foreach loop will block the thread. + +Here is an example how a stream of tasks can be processed in parallel by 2 workers using `foreach()`. + +```scala +Future { + ch.foreach(v => { + println(s"Worker 1 received $v") + Thread.sleep(500) // Simulate processing + }) +} + +Future { + ch.foreach(v => { + println(s"Worker 2 received $v") + Thread.sleep(600) // Simulate processing + }) +} + +ch.send(1) +ch.send(2) +ch.send(3) +ch.send(4) +ch.close() +``` + +Output: +``` +Worker 1 received 1 +Worker 2 received 2 +Worker 1 received 3 +Worker 2 received 4 +``` + +### Select + +What makes channels great is that a program can wait for events in several channels at the same time. +In this example `select()` is used to wait for any of two channels to have an incoming message. Once a message +is available it is received. + +```scala +import com.github.yruslan.channel.Channel +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global + +val channel1 = Channel.make[String] +val channel2 = Channel.make[String] + +Future { + Thread.sleep(1000 /*1 second*/) + channel1.send("one") +} + +Future { + Thread.sleep(2000 /*2 seconds*/) + channel2.send("two") +} + +for (_ <- Range(0, 2)) { + Channel.select( + channel1.recver{v => println(v)}, + channel2.recver{v => println(v)} + ) +} +``` +Output: +``` +one +two +``` + +#### Default blocks +Like in Go, you can have default blocks that will be executed if none of other blocks are ready: + +```scala +Channel.select( + channel1.recver{v => println(v)}, + channel2.sender(v) { /* An action to do after the send. */ }, + Channel.default { /* The action to do if nether channel1 nor channel2 can receive and send. */ } +) +``` + +#### Time.after() +In Go, you can use `Time.After(duration)` to generate a channel that sends a message after the specified time has +passed. You don't need to close the channel afterward. The Scala implementation is `TimeChannels.after(duration)` + +```scala +import com.github.yruslan.channel._ + +Channel.select( + someChannel.recver { v => println(s"Got a message in time: $v") }, + TimeChannels.after(Duration(10, TimeUnit.SECONDS)).recver { v => println("Time is out!") } +) +``` + +#### Time.newTicker() +In Go, you can use `Time.newTicker(duration)` to generate a channel that sends a message after the specified time. +When the message is consumed, the ticker will generate another message after the same time duration. +Tickers should be closed after they are not in use. The Scala implementation is `TimeChannels.ticker(duration)` + +```scala +import com.github.yruslan.channel._ + +val ticker = TimeChannels.ticker(Duration(10, TimeUnit.SECONDS)) + +Channel.select( + someChannel.recver { v => println(s"Got a message: $v") }, + ticker.recver { _ => println("Tick.") } +) + +ticker.close() +``` + +#### WaitGroup +Similar to GoLang you can use `WaitGroup` to coordinate multiple threads doing work. Use `add()`, `done()`, and `await()`. +```scala +val wg = WaitGroup() +val n = 10 +for (_ <- 1 to n) { + wg.add() + new Thread { + // do some work + wg.done() + }.start() +} +wg.await() +``` + +### Non-blocking methods +Go supports non-blocking channel operation by the elegant `default` clause in the `select` statement. The scala port +adds separate methods that support non-blocking operations: `trySend()`, `tryRecv()` and `trySelect()`. There is an +optional timeout parameter for each of these methods. If it is not specified, all methods return immediately without +any waiting. If the timeout is specified the methods will wait the specified amount of time if the expected conditions +are not met. Timeout can be set to `Duration.Inf`. In this case these methods are equivalent to their blocking variants +with the exception of the type of returned value. + +* `trySend()` returns a boolean. If `true`, the message has been sent successfully, otherwise it is failed for whatever + reason (maybe the channel was closed or the buffer is full). +* `tryRecv()` returns a optional value. If there are no pending messages the method returns `None`. +* `trySelect()` returns true if any of specified operations have executed. + +Here is an example of non-blocking methods: +```scala +val ch1 = Channel.make[Int](1) +val ch2 = Channel.make[String](1) + +var ok = ch1.trySend(1) +println(s"msg1 -> channel1: $ok") + +ok = ch1.trySend(2) +println(s"msg2 -> channel1: $ok") + +val msg1 = ch1.tryRecv() +println(s"msg1 <- channel1: $msg1") + +val msg2 = ch1.tryRecv() +println(s"msg2 <- channel1: $msg2") + +val okSelect = Channel.trySelect( + ch1.recver{v => println(v)}, + ch2.recver{v => println(v)}) +println(s"selected: $okSelect") +``` + +Output: +``` +msg1 -> channel1: true +msg2 -> channel1: false +msg1 <- channel1: Some(1) +msg2 <- channel1: false +selected: None +``` + +### General pattern for select() +Since channels can be copied back and forth netween threads, each channel can have multiple readers and writers. So +if `sclect()` returns a channel there are no guarantees that another thread won't fetch the message before the current +thread can receive it. + +Here is an example where the worker is written so it would work correctly in case channels have multiple readers and +writers. + +```scala +import com.github.yruslan.channel.Channel +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global +import Channel._ + +def worker(channel1: Channel[Int], channel2: Channel[String]): Unit = { + while(!channel1.isClosed && !channel2.isClosed) { + select( + channel1.recver{i => println(s"Int received: $i")}, + channel2.recver{s => println(s"String received: $s")} + ) + } +} + +val channell = Channel.make[Int] +val channel2 = Channel.make[String] + +val fut = Future { + worker(channell, channel2) +} + +channell.send(1) +channel2.send("abc") +channell.send(2) +channel2.send("edef") +channell.close() +channel2.close() +``` + +Output: +``` +Int received: 1 +String received: abc +Int received: 2 +String received: edef +``` + +### A balancer example +Here is example function that balances inputs from two channels into two output channels. +Notice that `select()` is used to wait for any of the channels to have an incoming message, as well as +to select a free output channel. You can mix `sender()` and `recver()` calls in the same `select()` statement. +A finish channel is used to signal the end of the balancing process. + +```scala +def balancer(input1: ReadChannel[Int], + input2: ReadChannel[Int], + output1: WriteChannel[Int], + output2: WriteChannel[Int], + finishChannel: ReadChannel[Boolean]): Unit = { + var v: Int = 0 + var exit = false + + while (!exit) { + select( + input1.recver(x => v = x), + input2.recver(x => v = x), + finishChannel.recver(_ => exit = true) + ) + + if (!exit) { + select( + output1.sender(v) {}, + output2.sender(v) {} + ) + } + } +} +``` + +### Scala-specific channel features +Since Scala is a functional language, this implementation of channels supports functional operations used in for comprehension. + +#### map() + +You can lazy map messages in a channel. + +```scala +// Creating a channel of integers +val chInt = Channel.make[Int](2) + +// The channel of strings is the result of mapping the channel of integer +val chString = chInt.map(v => v.toString) + +// Send some integers +chInt.send(1) +chInt.send(2) +chInt.close() + +// Receive some strings +val s1: String = chString.recv() +val s2: String = chString.recv() +``` + +#### filter() + +You can lazy filer messages in a channel. + +```scala +// Creating a channel of integers +val chInput = Channel.make[Int](3) + +// Filter the original channel +val chFiltered = chInput.filter(v => v != 2) + +// Send some integers +chInput.send(1) +chInput.send(2) +chInput.send(3) +chInput.close() + +// Receive filtered values +val v1 = chFiltered.recv() // 1 +val v2 = chFiltered.recv() // 3 +``` + +#### for() comprehension + +You can use `for` comprehension for channels. + +```scala +// Creating a channel of integers +val chInput = Channel.make[Int](3) + +// Applying maps and filters +val chOutput = chInput + .map(v => v * 2) + .filter(v => v != 4) + +// Sending values to the input channel +chInput.send(1) +chInput.send(2) +chInput.send(3) +chInput.close() + +// Traversing the output channel using for comprehension +for { + a <- chOutput + if a > 5 +} println(a) + +// Outputs: +// 6 +``` + +#### toList + +You can convert a channel to `List` by collecting all messages. This operation will block until the channel is closed. + +```scala +val chInput = Channel.make[Int](3) + +chInput.send(1) +chInput.send(2) +chInput.send(3) +chInput.close() + +val lst = chInput.toList // List(1, 2, 3) +``` + +## Changelog +- #### 0.2.1 released May 11, 2024. + - Add [wait groups](https://github.com/yruslan/channel_scala/tree/master?tab=readme-ov-file#waitgroup). + - Improve the performance of time-based channels. + +- #### 0.2.0 released Feb 19, 2024. + - Add support for `default` block in `select()`. + - Add `after()` and `ticker()`. + - Change select() logic for synchronous channels to match behavior of GoLang. + +- #### 0.1.6 released Dec 3, 2023. + - Fix channel filtering does not filter some values at random. + +- #### 0.1.5 released Nov 26, 2023. + - Add handling of `InterruptedException` that can occur while waiting on a channel. + - Add support for priority `prioritySelect()` for channels. When several selectors are ready the first one will take precedence. + - Fix race condition in `foreach()` when an exception is thrown inside the action. + +- #### 0.1.4 released Sep 8, 2022. + - Add covariance for read-only channels. + - E.g. `val ch1: ReadChannel[Animal] = ch2: ReadChannel[Dog]`. + - Add contravariance for write-only channels. + - E.g. `val ch1: Writehannel[Dog] = ch2: Writehannel[Animal]`. + +- #### 0.1.3 released Mar 13, 2022. + - Add support for unbounded channels (use `Channel.makeUnbounded[MyType]()`. + +- #### 0.1.2 released Jan 13, 2022. + - Add support for `map()`, `filter()` and `for` comprehension for channels. + +- #### 0.1.1 released May 11, 2021. + - Fix one corner case of trySelect(). + +- #### 0.1.0 released Feb 17, 2021. + - The initial release. diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..bb9cda0 --- /dev/null +++ b/build.sbt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ScalacOptions._ + +lazy val scala211 = "2.11.12" +lazy val scala212 = "2.12.19" +lazy val scala213 = "2.13.13" + +name := "channel_scala" +organization := "com.github.yruslan" + +scalaVersion := scala212 +crossScalaVersions := Seq(scala211, scala212, scala213) + +scalacOptions := scalacOptionsFor(scalaVersion.value) + +libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.18" % "test" + +releasePublishArtifactsAction := PgpKeys.publishSigned.value + +releaseCrossBuild := true +addCommandAlias("releaseNow", ";set releaseVersionBump := sbtrelease.Version.Bump.Bugfix; release with-defaults") diff --git a/project/ScalacOptions.scala b/project/ScalacOptions.scala new file mode 100644 index 0000000..e06e681 --- /dev/null +++ b/project/ScalacOptions.scala @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import sbt._ + +object ScalacOptions { + val scalacOptionsForAllVersions = Seq( + "-encoding", "UTF-8", // Source files are in UTF-8 + "-unchecked", // Warn about unchecked type parameters + "-deprecation", // Warn about use of deprecated APIs + "-feature", // Warn about misused language features + "-explaintypes" // Explain type errors in more detail + ) + + val compilerWarningOptions = Seq( + "-opt-warnings", // Enable optimizer warnings + "-opt:l:inline", // Enable inlining + "-opt:l:method", // Enable method-local optimizations + "-opt-inline-from:com.github.yruslan.channel.**" // Enable inlining within the livrary package + ) + + lazy val scalacOptions211 = scalacOptionsForAllVersions ++ + Seq( + "-Xsource:2.11", // Treat compiler input as Scala source for scala-2.11 + "-target:jvm-1.8" // Target JVM 1.8 + ) + + lazy val scalacOptions212 = scalacOptionsForAllVersions ++ compilerWarningOptions ++ + Seq( + "-Xsource:2.12", // Treat compiler input as Scala source for scala-2.12 + "-release:8" // Target JVM 1.8 + ) + + lazy val scalacOptions213 = scalacOptionsForAllVersions ++ compilerWarningOptions ++ + Seq( + "-Xsource:2.13", // Treat compiler input as Scala source for scala-2.13 + "-release:8" // Target JVM 1.8 + ) + + def scalacOptionsFor(scalaVersion: String): Seq[String] = { + val scalacOptions = CrossVersion.partialVersion(scalaVersion) match { + case Some((2, minor)) if minor >= 13 => + scalacOptions213 + case Some((2, minor)) if minor == 12 => + scalacOptions212 + case _ => + scalacOptions211 + } + println(s"Scala $scalaVersion compiler options: ${scalacOptions.mkString(" ")}") + scalacOptions + } +} diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..04267b1 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.9.9 diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..d3e5d63 --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.0") +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.0") +addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.2.0") +addSbtPlugin("com.github.sbt" % "sbt-release" % "1.1.0") diff --git a/publish.sbt b/publish.sbt new file mode 100644 index 0000000..59a4229 --- /dev/null +++ b/publish.sbt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ThisBuild / scmInfo := Some( + ScmInfo( + browseUrl = url("https://github.com/yruslan/channel_scala/tree/master"), + connection = "scm:git:git://github.com/yruslan/channel_scala.git", + devConnection = "scm:git:ssh://github.com/yruslan/channel_scala.git" + ) +) + +ThisBuild / developers := List( + Developer( + id = "yruslan", + name = "Ruslan Iushchenko", + email = "yruslan@gmail.com", + url = url("https://github.com/yruslan") + ) +) + +ThisBuild / homepage := Some(url("https://github.com/yruslan/channel_scala")) +ThisBuild / description := "A port of GoLang channels to Scala" +ThisBuild / startYear := Some(2020) +ThisBuild / licenses += "Apache-2.0" -> url("https://www.apache.org/licenses/LICENSE-2.0.txt") + +ThisBuild / pomIncludeRepository := { _ => false } +ThisBuild / publishTo := { + val nexus = "https://oss.sonatype.org/" + if (isSnapshot.value) { + Some("snapshots" at s"${nexus}content/repositories/snapshots") + } else { + Some("releases" at s"${nexus}service/local/staging/deploy/maven2") + } +} +ThisBuild / publishMavenStyle := true diff --git a/src/main/scala/com/github/yruslan/channel/AsyncChannel.scala b/src/main/scala/com/github/yruslan/channel/AsyncChannel.scala new file mode 100644 index 0000000..d0cb9da --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/AsyncChannel.scala @@ -0,0 +1,229 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +import com.github.yruslan.channel.impl.Awaiter + +import scala.collection.mutable +import scala.concurrent.duration.Duration + +class AsyncChannel[T](maxCapacity: Int) extends Channel[T] { + require(maxCapacity > 0) + + protected val q = new mutable.Queue[T] + + final override def close(): Unit = { + lock.lock() + try { + if (!closed) { + closed = true + readWaiters.foreach(w => w.sem.release()) + writeWaiters.foreach(w => w.sem.release()) + crd.signalAll() + cwr.signalAll() + } + } finally { + lock.unlock() + } + } + + @throws[InterruptedException] + final override def send(value: T): Unit = { + lock.lock() + try { + if (closed) { + throw new IllegalStateException(s"Attempt to send to a closed channel.") + } + + writers += 1 + while (q.size == maxCapacity && !closed) { + awaitWriters() + } + + if (!closed) { + q.enqueue(value) + } + notifyReaders() + writers -= 1 + + } finally { + lock.unlock() + } + } + + final override def trySend(value: T): Boolean = { + lock.lock() + try { + if (closed) { + false + } else { + if (q.size == maxCapacity) { + false + } else { + q.enqueue(value) + notifyReaders() + true + } + } + } finally { + lock.unlock() + } + } + + @throws[InterruptedException] + final override def trySend(value: T, timeout: Duration): Boolean = { + if (timeout == Duration.Zero) { + return trySend(value) + } + + val awaiter = new Awaiter(timeout) + + lock.lock() + try { + writers += 1 + var isTimeoutExpired = false + while (!closed && !hasCapacity && !isTimeoutExpired) { + isTimeoutExpired = !awaitWriters(awaiter) + } + writers -= 1 + + if (!closed && hasCapacity) { + q.enqueue(value) + notifyReaders() + true + } else { + false + } + } finally { + lock.unlock() + } + } + + @throws[InterruptedException] + final override def recv(): T = { + lock.lock() + try { + readers += 1 + while (!closed && q.isEmpty) { + awaitReaders() + } + + if (closed && q.isEmpty) { + throw new IllegalStateException(s"Attempt to receive from a closed channel.") + } + + val v: T = q.dequeue() + readers -= 1 + + notifyWriters() + v + } finally { + lock.unlock() + } + } + + final override def tryRecv(): Option[T] = { + lock.lock() + try { + if (closed && q.isEmpty) { + None + } else { + if (q.isEmpty) { + None + } else { + val v = q.dequeue() + notifyWriters() + Option(v) + } + } + } finally { + lock.unlock() + } + } + + @throws[InterruptedException] + final override def tryRecv(timeout: Duration): Option[T] = { + if (timeout == Duration.Zero) { + return tryRecv() + } + + val awaiter = new Awaiter(timeout) + + lock.lock() + try { + readers += 1 + var isTimeoutExpired = false + while (!closed && !hasMessages && !isTimeoutExpired) { + isTimeoutExpired = !awaitReaders(awaiter) + } + readers -= 1 + + fetchValueOpt() + } finally { + lock.unlock() + } + } + + final override def isClosed: Boolean = { + lock.lock() + val result = if (q.nonEmpty) { + false + } else { + closed + } + lock.unlock() + result + } + + /* This method assumes the lock is being held. */ + final override protected def hasCapacity: Boolean = { + q.size < maxCapacity + } + + /* This method assumes the lock is being held. */ + final override protected def hasMessages: Boolean = { + q.nonEmpty + } + + /* This method assumes the lock is being held. */ + final override protected def fetchValueOpt(): Option[T] = { + if (q.isEmpty) { + None + } else { + notifyWriters() + Option(q.dequeue()) + } + } + + /** This method is for internal use only. It is used for performance-optimized special channels. */ + final private[channel] def sendAndClose(value: T): Unit = { + lock.lock() + try { + if (closed) { + throw new IllegalStateException(s"Attempt to send to a closed channel.") + } + + q.enqueue(value) + closed = true + readWaiters.foreach(w => w.sem.release()) + writeWaiters.foreach(w => w.sem.release()) + crd.signalAll() + cwr.signalAll() + } finally { + lock.unlock() + } + } +} diff --git a/src/main/scala/com/github/yruslan/channel/Channel.scala b/src/main/scala/com/github/yruslan/channel/Channel.scala new file mode 100644 index 0000000..79760c6 --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/Channel.scala @@ -0,0 +1,491 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +import com.github.yruslan.channel.Channel.{CLOSED, SUCCESS, WAITING_REQUIRED} +import com.github.yruslan.channel.impl.{Awaiter, Selector, SimpleLinkedList, Waiter} + +import java.util.concurrent.locks.{Condition, ReentrantLock} +import java.util.concurrent.{Semaphore, TimeUnit} +import scala.concurrent.duration.Duration +import scala.util.Random + +abstract class Channel[T] extends ReadChannel[T] with WriteChannel[T] { + protected var readers: Int = 0 + protected var writers: Int = 0 + protected var closed = false + + protected val readWaiters = new SimpleLinkedList[Waiter] + protected val writeWaiters = new SimpleLinkedList[Waiter] + + // Scala & Java monitors are designed so each object can act as a mutex and a condition variable. + // But this makes impossible to use a single lock for more than one condition. + // So a lock from [java.util.concurrent.locks] is used instead. It allows to have several condition + // variables that use a single lock. + protected val lock = new ReentrantLock() + protected val crd: Condition = lock.newCondition() + protected val cwr: Condition = lock.newCondition() + + final override def fornew[U](f: T => U): Unit = { + var valueOpt = tryRecv() + + while (valueOpt.nonEmpty) { + valueOpt.foreach(v => f(v)) + valueOpt = tryRecv() + } + } + + @throws[InterruptedException] + final override def foreach[U](f: T => U): Unit = { + while (true) { + var valOpt: Option[T] = None + + lock.lock() + try { + readers += 1 + while (!closed && !hasMessages) { + awaitReaders() + } + readers -= 1 + if (isClosed) { + return + } + + valOpt = fetchValueOpt() + } finally { + lock.unlock() + } + + valOpt.foreach(f) + } + } + + protected def fetchValueOpt(): Option[T] + + final override def sender(value: T)(action: => Unit = {}): Selector = { + new Selector(true, false, this) { + override def sendRecv(waiterOpt: Option[Waiter]): Int = { + lock.lock() + try { + if (closed) { + CLOSED + } else { + val ok = trySend(value) + if (ok) { + SUCCESS + } else { + waiterOpt.foreach(waiter => writeWaiters.append(waiter)) + WAITING_REQUIRED + } + } + } finally { + lock.unlock() + } + } + + override def afterAction(): Unit = action + } + } + + final override def recver(action: T => Unit): Selector = { + new Selector(false, false, this) { + var el: T = _ + + override def sendRecv(waiterOpt: Option[Waiter]): Int = { + lock.lock() + try { + val opt = tryRecv() + opt.foreach(v => el = v) + if (opt.isEmpty) { + if (closed) { + CLOSED + } else { + waiterOpt.foreach(waiter => readWaiters.append(waiter)) + WAITING_REQUIRED + } + } else { + SUCCESS + } + } finally { + lock.unlock() + } + } + + override def afterAction(): Unit = action(el) + } + } + + /* This method assumes the lock is being held. */ + final protected def notifyReaders(): Unit = { + if (readers > 0) { + crd.signal() + } else { + if (!readWaiters.isEmpty) { + readWaiters.returnHeadAndRotate().sem.release() + } + } + } + + /* This method assumes the lock is being held. */ + final protected def notifyWriters(): Unit = { + if (writers > 0) { + cwr.signal() + } else { + if (!writeWaiters.isEmpty) { + writeWaiters.returnHeadAndRotate().sem.release() + } + } + } + + /* This method assumes the lock is being held. */ + protected def hasCapacity: Boolean + + /* This method assumes the lock is being held. */ + protected def hasMessages: Boolean + + /* This method assumes the lock is being held. */ + @throws[InterruptedException] + final protected def awaitWriters(): Unit = { + try { + cwr.await() + } catch { + case ex: Throwable => + writers -= 1 + cwr.signal() + throw ex + } + } + + /* This method assumes the lock is being held. */ + @throws[InterruptedException] + final protected def awaitWriters(awaiter: Awaiter): Boolean = { + try { + awaiter.await(cwr) + } catch { + case ex: Throwable => + writers -= 1 + cwr.signal() + throw ex + } + } + + /* This method assumes the lock is being held. */ + @throws[InterruptedException] + final protected def awaitReaders(): Unit = { + try { + crd.await() + } catch { + case ex: Throwable => + readers -= 1 + crd.signal() + throw ex + } + } + + /* This method assumes the lock is being held. */ + @throws[InterruptedException] + final protected def awaitReaders(awaiter: Awaiter): Boolean = { + try { + awaiter.await(crd) + } catch { + case ex: Throwable => + readers -= 1 + crd.signal() + throw ex + } + } + + @inline + final private def delReaderWaiter(waiter: Waiter): Unit = { + lock.lock() + try { + readWaiters.remove(waiter) + } finally { + lock.unlock() + } + } + + @inline + final private def delWriterWaiter(waiter: Waiter): Unit = { + lock.lock() + try { + writeWaiters.remove(waiter) + } finally { + lock.unlock() + } + } +} + +object Channel { + val SUCCESS = 0 + val WAITING_REQUIRED = 1 + val CLOSED = 2 + + /** + * Create a synchronous channel. + * + * @tparam T The type of the channel. + * @return A new channel + */ + def make[T]: Channel[T] = { + new SyncChannel[T] + } + + /** + * Create a channel. By default a synchronous channel will be created. + * If bufferSize is greater then zero, a buffered channel will be created. + * + * @param bufferSize Asynchronous buffer size. + * @tparam T The type of the channel. + * @return A new channel + */ + def make[T](bufferSize: Int): Channel[T] = { + require(bufferSize >= 0) + + if (bufferSize > 0) { + new AsyncChannel[T](bufferSize) + } else { + new SyncChannel[T] + } + } + + /** + * Create an unbounded asynchronous channel. + * + * @tparam T The type of the channel. + * @return A new channel + */ + def makeUnbounded[T]: Channel[T] = { + new AsyncChannel[T](Int.MaxValue) + } + + /** + * Waits for a non-blocking operation to be available on the list of channels. + * If more than one channel is ready to perform its operation, the channel to perform the operation on will be chosen + * at random. + * + * @param selector A first channel to wait for (mandatory). + * @param selectors Other channels to wait for. + * @return true is none of the channels are closed and select() can be invoked again, false if at least one of channels is closed. + */ + def select(selector: Selector, selectors: Selector*): Boolean = { + trySelect(Duration.Inf, false, selector, selectors: _*) + } + + /** + * Waits for a non-blocking operation to be available on the list of channels. + * If more than one channel is ready to perform its operation, the first one in the list takes precedence. + * + * @param selector A first channel to wait for (mandatory). + * @param selectors Other channels to wait for. + * @return true is none of the channels are closed and select() can be invoked again, false if at least one of channels is closed. + */ + def prioritySelect(selector: Selector, selectors: Selector*): Boolean = { + trySelect(Duration.Inf, true, selector, selectors: _*) + } + + /** + * Non-blocking check for a possibility of a non-blocking operation on several channels. + * If more than one channel is ready to perform its operation, the channel to perform the operation on will be chosen + * at random. + * + * @param selector A first channel to wait for (mandatory). + * @param selectors Other channels to wait for. + * @return true if one of pending operations wasn't blocking. + */ + def trySelect(selector: Selector, selectors: Selector*): Boolean = { + trySelect(Duration.Zero, false, selector, selectors: _*) + } + + /** + * Non-blocking check for a possibility of a non-blocking operation on several channels. + * + * @param selector A first channel to wait for (mandatory). + * @param selectors Other channels to wait for. + * @return true if one of pending operations wasn't blocking. + */ + def trySelect(timout: Duration, selector: Selector, selectors: Selector*): Boolean = { + trySelect(timout, false, selector, selectors: _*) + } + + /** + * Non-blocking check for a possibility of a non-blocking operation on several channels. + * If more than one channel is ready to perform its operation, the first one in the list takes precedence. + * + * @param selector A first channel to wait for (mandatory). + * @param selectors Other channels to wait for. + * @return true if one of pending operations wasn't blocking. + */ + def tryPrioritySelect(selector: Selector, selectors: Selector*): Boolean = { + trySelect(Duration.Zero, true, selector, selectors: _*) + } + + /** + * Waits for a non-bloaking action to be available. + * + * @param timout A timeout to wait for a non-blocking action to be available. + * @param isPriorityOrdered If true, when more then one selectors is ready, the first one in the list will be selected. + * @param selector A first channel to wait for (mandatory). + * @param selectors Other channels to wait for. + * @return true if one of pending operations wasn't blocking. + */ + @throws[InterruptedException] + final def trySelect(timout: Duration, isPriorityOrdered: Boolean, selector: Selector, selectors: Selector*): Boolean = { + val sel = (selector +: selectors).toArray + + if (!isPriorityOrdered) { + shuffleArray(sel) + } + + if (ifHasDefaultProcessSelectors(sel)) + return true + + val waiter = new Waiter(new Semaphore(0), Thread.currentThread().getId) + + // Add waiters + var i = 0 + while (i < sel.length) { + val s = sel(i) + val status = s.sendRecv(Some(waiter)) + if (status == SUCCESS) { + removeWaiters(waiter, sel, i) + s.afterAction() + return true + } + i += 1 + } + + while (true) { + // Re-checking all channels + i = 0 + while (i < sel.length) { + val s = sel(i) + val status = s.sendRecv(None) + + if (status == SUCCESS) { + removeWaiters(waiter, sel, sel.length) + s.afterAction() + return true + } else if (status == CLOSED) { + removeWaiters(waiter, sel, sel.length) + return false + } + i += 1 + } + + val success = try { + if (timout.isFinite) { + waiter.sem.tryAcquire(timout.toMillis, TimeUnit.MILLISECONDS) + } else { + waiter.sem.acquire() + true + } + } catch { + case ex: Throwable => + removeWaiters(waiter, sel, sel.length) + throw ex + } + + if (!success) { + removeWaiters(waiter, sel, sel.length) + return false + } + } + // This never happens since the method can only exit on other return paths + false + } + + final def default(action: => Unit): Selector = { + new Selector(false, true, null) { + override def sendRecv(waiterOpt: Option[Waiter]): Int = { + SUCCESS + } + + override def afterAction(): Unit = action + } + } + + final private def ifHasDefaultProcessSelectors(selectors: Array[Selector]): Boolean = { + var i = 0 + var defaults = 0 + var defaultSelectorIndex = -1 + while (i < selectors.length) { + if (selectors(i).isDefault) { + defaultSelectorIndex = i + defaults += 1 + } + i += 1 + } + + if (defaults == 1) { + selectWithDefault(selectors, defaultSelectorIndex) + true + } else if (defaults > 1) { + throw new IllegalArgumentException("Only one default selector is allowed.") + } else { + false + } + } + + /** + * Activates one of selectors if available, executes the default selector if no other selectors are available. + * + * @param selectors Channel selectors to wait for. + */ + @throws[InterruptedException] + final private def selectWithDefault(selectors: Array[Selector], defaultSelectorIndex: Int): Unit = { + var i = 0 + while (i < selectors.length) { + val s = selectors(i) + if (!s.isDefault) { + val status = s.sendRecv(None) + if (status == SUCCESS) { + s.afterAction() + return + } + } + i += 1 + } + + selectors(defaultSelectorIndex).afterAction() + } + + + @inline + final private def removeWaiters(waiter: Waiter, sel: Array[Selector], numberOfWaiters: Int): Unit = { + var j = 0 + while (j < numberOfWaiters) { + if (sel(j).isSender) { + sel(j).channel.delWriterWaiter(waiter) + } else { + sel(j).channel.delReaderWaiter(waiter) + } + j += 1 + } + } + + final private def shuffleArray(array: Array[Selector]): Unit = { + val random = new Random() + var i = array.length - 1 + while (i > 0) { + val j = random.nextInt(i + 1) + val temp = array(i) + array(i) = array(j) + array(j) = temp + i -= 1 + } + } +} diff --git a/src/main/scala/com/github/yruslan/channel/ChannelDecorator.scala b/src/main/scala/com/github/yruslan/channel/ChannelDecorator.scala new file mode 100644 index 0000000..0edf26c --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/ChannelDecorator.scala @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +abstract class ChannelDecorator[T](inputChannel: ReadChannel[T]) extends ChannelLike { + override def isClosed: Boolean = inputChannel.isClosed +} diff --git a/src/main/scala/com/github/yruslan/channel/ChannelDecoratorFilter.scala b/src/main/scala/com/github/yruslan/channel/ChannelDecoratorFilter.scala new file mode 100644 index 0000000..e979662 --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/ChannelDecoratorFilter.scala @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +import com.github.yruslan.channel.impl.Selector + +import java.time.Instant +import java.time.Instant.now +import scala.concurrent.duration.{Duration, MILLISECONDS} + +class ChannelDecoratorFilter[T](inputChannel: ReadChannel[T], pred: T => Boolean) extends ChannelDecorator[T](inputChannel) with ReadChannel[T] { + @throws[InterruptedException] + override def recv(): T = { + var v = inputChannel.recv() + var found = pred(v) + + while (!found) { + v = inputChannel.recv() + found = pred(v) + } + v + } + + override def tryRecv(): Option[T] = { + var valueOpt = inputChannel.tryRecv() + var found = valueOpt.isEmpty || valueOpt.forall(v => pred(v)) + + while (!found) { + valueOpt = inputChannel.tryRecv() + found = valueOpt.isEmpty || valueOpt.forall(v => pred(v)) + } + valueOpt + } + + @throws[InterruptedException] + override def tryRecv(timeout: Duration): Option[T] = { + if (timeout == Duration.Zero) { + return tryRecv() + } + + val timeoutMilli = if (timeout.isFinite) timeout.toMillis else 0L + val startInstant = Instant.now() + var elapsedTime = 0L + + while (elapsedTime <= timeoutMilli) { + val newTimeout = Duration(timeoutMilli - elapsedTime, MILLISECONDS) + val valueOpt = inputChannel.tryRecv(newTimeout) + val found = valueOpt.isEmpty || valueOpt.forall(v => pred(v)) + elapsedTime = java.time.Duration.between(startInstant, now).toMillis + if (found) { + return valueOpt + } + } + None + } + + override def recver(action: T => Unit): Selector = inputChannel.recver(t => if (pred(t)) action(t)) + + override def fornew[U](action: T => U): Unit = inputChannel.fornew(t => if (pred(t)) action(t)) + + @throws[InterruptedException] + override def foreach[U](action: T => U): Unit = inputChannel.foreach(t => if (pred(t)) action(t)) +} diff --git a/src/main/scala/com/github/yruslan/channel/ChannelDecoratorMap.scala b/src/main/scala/com/github/yruslan/channel/ChannelDecoratorMap.scala new file mode 100644 index 0000000..2f3165f --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/ChannelDecoratorMap.scala @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel +import com.github.yruslan.channel.impl.Selector + +import scala.concurrent.duration.Duration + +class ChannelDecoratorMap[T, U](inputChannel: ReadChannel[T], f: T => U) extends ChannelDecorator[T](inputChannel) with ReadChannel[U] { + @throws[InterruptedException] + override def recv(): U = f(inputChannel.recv()) + + override def tryRecv(): Option[U] = inputChannel.tryRecv().map(f) + + @throws[InterruptedException] + override def tryRecv(timeout: Duration): Option[U] = inputChannel.tryRecv(timeout).map(f) + + override def recver(action: U => Unit): Selector = inputChannel.recver(t => action(f(t))) + + override def fornew[K](action: U => K): Unit = inputChannel.fornew(t => action(f(t))) + + @throws[InterruptedException] + override def foreach[K](action: U => K): Unit = inputChannel.foreach(t => action(f(t))) +} diff --git a/src/main/scala/com/github/yruslan/channel/ChannelLike.scala b/src/main/scala/com/github/yruslan/channel/ChannelLike.scala new file mode 100644 index 0000000..acaf356 --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/ChannelLike.scala @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +trait ChannelLike { + def isClosed: Boolean +} diff --git a/src/main/scala/com/github/yruslan/channel/ReadChannel.scala b/src/main/scala/com/github/yruslan/channel/ReadChannel.scala new file mode 100644 index 0000000..6f1895c --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/ReadChannel.scala @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +import com.github.yruslan.channel.impl.Selector + +import scala.collection.mutable.ListBuffer +import scala.concurrent.duration.Duration + +trait ReadChannel[+T] extends ChannelLike { + @throws[InterruptedException] + def recv(): T + def tryRecv(): Option[T] + + @throws[InterruptedException] + def tryRecv(timeout: Duration): Option[T] + + def recver(action: T => Unit): Selector + + def fornew[U](f: T => U): Unit + + @throws[InterruptedException] + def foreach[U](f: T => U): Unit + + def map[U](f: T => U): ReadChannel[U] = new ChannelDecoratorMap[T, U](this, f) + def filter(f: T => Boolean): ReadChannel[T] = new ChannelDecoratorFilter[T](this, f) + def withFilter(f: T => Boolean): ReadChannel[T] = new ChannelDecoratorFilter[T](this, f) + + def toList: List[T] = { + val lst = new ListBuffer[T] + foreach(v => lst += v) + lst.toList + } +} diff --git a/src/main/scala/com/github/yruslan/channel/SyncChannel.scala b/src/main/scala/com/github/yruslan/channel/SyncChannel.scala new file mode 100644 index 0000000..e09750c --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/SyncChannel.scala @@ -0,0 +1,269 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +import com.github.yruslan.channel.impl.Awaiter + +import scala.concurrent.duration.Duration + +class SyncChannel[T] extends Channel[T] { + protected var syncValue: Option[T] = None + protected var sender: Long = -1 + + @throws[InterruptedException] + final override def close(): Unit = { + lock.lock() + try { + if (!closed) { + closed = true + readWaiters.foreach(w => w.sem.release()) + writeWaiters.foreach(w => w.sem.release()) + crd.signalAll() + cwr.signalAll() + + writers += 1 + while (syncValue.nonEmpty) { + awaitWriters() + } + writers -= 1 + } + } finally { + lock.unlock() + } + } + + @throws[InterruptedException] + final override def send(value: T): Unit = { + lock.lock() + try { + if (closed) { + throw new IllegalStateException(s"Attempt to send to a closed channel.") + } + + writers += 1 + while (hasMessages && !closed) { + awaitWriters() + } + if (!closed) { + syncValue = Option(value) + sender = Thread.currentThread().getId + notifySyncReaders() + + while (syncValue.nonEmpty && !closed) { + awaitWriters() + } + notifyWriters() + } + writers -= 1 + } finally { + lock.unlock() + } + } + + final override def trySend(value: T): Boolean = { + lock.lock() + try { + if (closed) { + false + } else { + if (!hasCapacity) { + false + } else { + syncValue = Option(value) + sender = Thread.currentThread().getId + notifySyncReaders() + true + } + } + } finally { + lock.unlock() + } + } + + @throws[InterruptedException] + final override def trySend(value: T, timeout: Duration): Boolean = { + if (timeout == Duration.Zero) { + return trySend(value) + } + + val awaiter = new Awaiter(timeout) + + lock.lock() + try { + writers += 1 + var isTimeoutExpired = false + while (!closed && !hasCapacity && !isTimeoutExpired) { + isTimeoutExpired = !awaitWriters(awaiter) + } + + val isSucceeded = syncValue match { + case Some(_) => + false + case None if closed => + false + case None if !hasCapacity => + false + case None => + syncValue = Option(value) + sender = Thread.currentThread().getId + notifySyncReaders() + true + } + writers -= 1 + isSucceeded + } finally { + lock.unlock() + } + } + + @throws[InterruptedException] + final override def recv(): T = { + lock.lock() + try { + readers += 1 + if (!closed && syncValue.isEmpty) { + notifyWriters() + } + while (!closed && syncValue.isEmpty) { + awaitReaders() + } + + if (closed && syncValue.isEmpty) { + throw new IllegalStateException(s"Attempt to receive from a closed channel.") + } + + val v: T = syncValue.get + syncValue = None + sender = -1 + readers -= 1 + notifyWriters() + v + } finally { + lock.unlock() + } + } + + final override def tryRecv(): Option[T] = { + lock.lock() + try { + if (closed && syncValue.isEmpty) { + None + } else { + if (syncValue.isEmpty) { + None + } else if (sender == Thread.currentThread().getId) { + notifySyncReaders() + None + } else { + val v = syncValue + syncValue = None + sender = -1 + notifyWriters() + v + } + } + } finally { + lock.unlock() + } + } + + @throws[InterruptedException] + final override def tryRecv(timeout: Duration): Option[T] = { + if (timeout == Duration.Zero) { + return tryRecv() + } + + val awaiter = new Awaiter(timeout) + + lock.lock() + try { + readers += 1 + var isTimeoutExpired = false + while (!closed && !hasMessages && !isTimeoutExpired) { + isTimeoutExpired = !awaitReaders(awaiter) + } + readers -= 1 + + fetchValueOpt() + } finally { + lock.unlock() + } + } + + final override def isClosed: Boolean = { + lock.lock() + val result = if (syncValue.nonEmpty) { + false + } else { + closed + } + lock.unlock() + result + } + + /* This method assumes the lock is being held. */ + final override protected def hasCapacity: Boolean = { + if (syncValue.isEmpty && (readers > 0)) { + true + } else if (syncValue.isDefined) { + false + } else { + val myThreadId = Thread.currentThread().getId + + var foundOtherThread = false + readWaiters.foreach(waiter => if (waiter.threadId != myThreadId) foundOtherThread = true) + + foundOtherThread + } + } + + /* This method assumes the lock is being held. */ + final override protected def hasMessages: Boolean = { + syncValue.isDefined && Thread.currentThread().getId != sender + } + + /* This method assumes the lock is being held. */ + final protected def fetchValueOpt(): Option[T] = { + if (syncValue.nonEmpty) { + notifyWriters() + } + val v = syncValue + syncValue = None + sender = -1 + v + } + + /* This method assumes the lock is being held. */ + final private def notifySyncReaders(): Unit = { + if (readers > 0) { + crd.signal() + } else { + if (!readWaiters.isEmpty) { + val count = readWaiters.size + var waiter = readWaiters.returnHeadAndRotate() + var i = 0 + while (i < count && waiter.threadId == sender) { + i += 1 + waiter = readWaiters.returnHeadAndRotate() + } + + if (waiter.threadId != sender) { + waiter.sem.release() + } + } + } + } +} diff --git a/src/main/scala/com/github/yruslan/channel/TimeChannels.scala b/src/main/scala/com/github/yruslan/channel/TimeChannels.scala new file mode 100644 index 0000000..d7c86c6 --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/TimeChannels.scala @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +import java.time.Instant +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration + +/** + * The object contains channel generators similar to GoLang: https://gobyexample.com/tickers + */ +object TimeChannels { + /** + * Create a channel that gets a single message after the specified duration, and then closes. + * + * Example: + * {{{ + * val after = TimeChannels.after(Duration(10, TimeUnit.MILLISECONDS)) + * after.recv() + * // 10 ms has passed. + * // You don't need to close the channel + * }}} + */ + def after(duration: Duration)(implicit executor: ExecutionContext): ReadChannel[Instant] = { + val channel = new AsyncChannel[Instant](1) + Future { + Thread.sleep(duration.toMillis) + try { + channel.sendAndClose(Instant.now()) + } catch { + case _: IllegalStateException => // Ignore if the channel is already closed + } + } + + channel + } + + /** + * Create a ticker channel, see https://gobyexample.com/tickers + * + * {{{ + * val ticker = TimeChannels.ticker(Duration(10, TimeUnit.MILLISECONDS)) + * val instant = ticker.recv() + * ticker.close() + * }}} + * + */ + def ticker(duration: Duration)(implicit executor: ExecutionContext): Channel[Instant] = { + val channel = new SyncChannel[Instant] + Future { + while (!channel.isClosed) { + Thread.sleep(duration.toMillis) + try { + channel.send(Instant.now()) + } catch { + case _: IllegalStateException => // Ignore if the channel is already closed + } + } + } + + channel + } +} diff --git a/src/main/scala/com/github/yruslan/channel/WaitGroup.scala b/src/main/scala/com/github/yruslan/channel/WaitGroup.scala new file mode 100644 index 0000000..4f3cf51 --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/WaitGroup.scala @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +import com.github.yruslan.channel.exception.NegativeWaitGroupCounter + +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.ReentrantLock + +/** + * WaitGroup is a synchronization primitive ported from GoLang allows one or more threads to wait until a set of + * operations completes. + */ +class WaitGroup { + private val lock = new ReentrantLock() + private val cond = lock.newCondition() + private val counter: AtomicInteger = new AtomicInteger(0) + + /** + * Add adds delta, which may be negative, to the WaitGroup counter. If the counter becomes zero, all waiters + * wake up and can continue. + * + * If the counter goes negative, the `NegativeWaitGroupCounter` exception is thrown at the current thread + * and all waiters. + */ + @throws[NegativeWaitGroupCounter] + def add(delta: Int = 1): Unit = { + val after = counter.addAndGet(delta) + + if (after <= 0) { + lock.lock() + try { + cond.signalAll() + if (after < 0) + throw new NegativeWaitGroupCounter + } finally { + lock.unlock() + } + } + } + + /** + * Done decrements the WaitGroup counter by one. + * + * If the counter goes negative, the `NegativeWaitGroupCounter` exception is thrown at the current thread + * and all waiters. + */ + @throws[NegativeWaitGroupCounter] + def done(): Unit = { + add(-1) + } + + /** + * Await blocks until the WaitGroup counter is zero. + * + * In GoLang the corresponding method is `wait()`. However, in Java `wait()` is a method of `Object` and is used + * for implementation of monitors, and cannot be overridden. Therefore, this method is named `await()` instead. + * + * If the counter goes negative, the `NegativeWaitGroupCounter` exception is thrown. + */ + @throws[InterruptedException] + @throws[NegativeWaitGroupCounter] + def await(): Unit = { + lock.lock() + try { + while (counter.get() > 0) { + cond.await() + } + if (counter.get() < 0) { + throw new NegativeWaitGroupCounter + } + } finally { + lock.unlock() + } + } +} + +object WaitGroup { + def apply(): WaitGroup = new WaitGroup +} diff --git a/src/main/scala/com/github/yruslan/channel/WriteChannel.scala b/src/main/scala/com/github/yruslan/channel/WriteChannel.scala new file mode 100644 index 0000000..3a4f55b --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/WriteChannel.scala @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +import com.github.yruslan.channel.impl.Selector + +import scala.concurrent.duration.Duration + +trait WriteChannel[-T] extends ChannelLike { + @throws[InterruptedException] + def send(value: T): Unit + + def trySend(value: T): Boolean + + @throws[InterruptedException] + def trySend(value: T, timeout: Duration): Boolean + + def sender(value: T)(action: => Unit): Selector + + def close(): Unit +} diff --git a/src/main/scala/com/github/yruslan/channel/exception/NegativeWaitGroupCounter.scala b/src/main/scala/com/github/yruslan/channel/exception/NegativeWaitGroupCounter.scala new file mode 100644 index 0000000..b491362 --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/exception/NegativeWaitGroupCounter.scala @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel.exception + +class NegativeWaitGroupCounter extends Exception("Negative WaitGroup counter") diff --git a/src/main/scala/com/github/yruslan/channel/impl/Awaiter.scala b/src/main/scala/com/github/yruslan/channel/impl/Awaiter.scala new file mode 100644 index 0000000..d26a977 --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/impl/Awaiter.scala @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel.impl + +import java.time.Instant +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Condition + +import scala.concurrent.duration.Duration + +class Awaiter(timeout: Duration) { + private val startInstant = Instant.now() + private val timeoutMilli = if (timeout.isFinite) timeout.toMillis else 0L + + @throws[InterruptedException] + def await(cond: Condition): Boolean = { + if (timeout == Duration.Zero) { + false + } else { + if (timeout.isFinite) { + cond.await(timeLeft(), TimeUnit.MILLISECONDS) + } else { + cond.await() + } + !isTimeoutExpired + } + } + + @inline + final private def isTimeoutExpired: Boolean = { + if (!timeout.isFinite) { + false + } else { + elapsedTime() >= timeoutMilli + } + } + + @inline + final private def elapsedTime(): Long = { + val now = Instant.now() + java.time.Duration.between(startInstant, now).toMillis + } + + @inline + final private def timeLeft(): Long = { + val timeLeft = timeoutMilli - elapsedTime() + if (timeLeft < 0L) 0L else timeLeft + } + +} diff --git a/src/main/scala/com/github/yruslan/channel/impl/Selector.scala b/src/main/scala/com/github/yruslan/channel/impl/Selector.scala new file mode 100644 index 0000000..e2ac531 --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/impl/Selector.scala @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel.impl + +import com.github.yruslan.channel.Channel + +private[channel] abstract class Selector(val isSender: Boolean, + val isDefault: Boolean, + val channel: Channel[_]) { + def sendRecv(Opt: Option[Waiter]): Int + def afterAction(): Unit +} diff --git a/src/main/scala/com/github/yruslan/channel/impl/SimpleLinkedList.scala b/src/main/scala/com/github/yruslan/channel/impl/SimpleLinkedList.scala new file mode 100644 index 0000000..03c80bf --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/impl/SimpleLinkedList.scala @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel.impl + +/** + * The motivation for implementing this tiny linked list is to have very fast append and remove operations + * when the number of elements is small. + */ +class SimpleLinkedList[T] { + class Elem[U](var el: U, var next: Elem[U]) + + private var first: Elem[T] = _ + private var last: Elem[T] = _ + private var count = 0 + + def append(a: T): Unit = this.synchronized { + val newElement = new Elem[T](a, null) + if (first == null) { + first = newElement + last = first + } else { + last.next = newElement + last = newElement + } + count += 1 + } + + def isEmpty: Boolean = { + first == null + } + + def nonEmpty: Boolean = { + first != null + } + + def size: Int = count + + def head: T = { + if (first == null) { + throw new NoSuchElementException + } else { + first.el + } + } + + def returnHeadAndRotate(): T = this.synchronized { + if (first == null) { + throw new NoSuchElementException + } else { + val ret = first.el + rotate() + ret + } + } + + def remove(a: T): Unit = this.synchronized { + if (first == null) { + return + } + + if (first.el == a) { + dropFirst() + } else { + var removed = false + var p = first + while (p.next != null && !removed) { + if (p.next.el == a) { + p.next = p.next.next + if (p.next == null) { + last = p + } + removed = true + count -= 1 + } else { + p = p.next + } + } + } + } + + def containsNot(a: T): Boolean = this.synchronized { + var p = first + while (p != null) { + if (p.el != a) { + return true + } + p = p.next + } + false + } + + def findNot(a: T): Option[T] = this.synchronized { + var p = first + while (p != null) { + if (p.el != a) { + return Option(p.el) + } + p = p.next + } + None + } + + def clear(): Unit = this.synchronized { + first = null + last = null + count = 0 + } + + def foreach[U](f: T => U): Unit = this.synchronized { + var p = first + while (p != null) { + f(p.el) + p = p.next + } + } + + private def dropFirst(): Unit = { + if (first == last) { + clear() + } else { + first = first.next + count -= 1 + } + } + + private def rotate(): Unit = { + if (first != last) { + val tmp = first + first = tmp.next + last.next = tmp + last = tmp + tmp.next = null + } + } +} diff --git a/src/main/scala/com/github/yruslan/channel/impl/Waiter.scala b/src/main/scala/com/github/yruslan/channel/impl/Waiter.scala new file mode 100644 index 0000000..63d7d98 --- /dev/null +++ b/src/main/scala/com/github/yruslan/channel/impl/Waiter.scala @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel.impl + +import java.util.concurrent.Semaphore + +class Waiter( + val sem: Semaphore, + val threadId: Long + ) diff --git a/src/test/scala/com/github/yruslan/channel/ChannelFilterSuite.scala b/src/test/scala/com/github/yruslan/channel/ChannelFilterSuite.scala new file mode 100644 index 0000000..ed33038 --- /dev/null +++ b/src/test/scala/com/github/yruslan/channel/ChannelFilterSuite.scala @@ -0,0 +1,298 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +import org.scalatest.wordspec.AnyWordSpec + +import java.time.Instant +import java.util.concurrent.Executors +import scala.concurrent._ +import scala.concurrent.duration.{Duration, MILLISECONDS} + +// This import is required for Scala 2.13 since it has a builtin Channel object. + +class ChannelFilterSuite extends AnyWordSpec { + implicit private val ec: ExecutionContextExecutor = + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) + + "Channel.filter()" should { + "filter input channel on recv()" in { + val ch1 = Channel.make[Int](3) + + val ch2 = ch1.filter(v => v != 2) + + ch1.send(1) + ch1.send(2) + ch1.send(3) + ch1.close() + + val v1 = ch2.recv() + val v2 = ch2.recv() + + assert(v1 == 1) + assert(v2 == 3) + } + + "filter input channel on tryRecv()" when { + "values either available or not" in { + val ch1 = Channel.make[Int](3) + + val ch2 = ch1.filter(v => v != 2) + + val v1 = ch2.tryRecv() + + ch1.send(1) + ch1.send(2) + ch1.send(3) + ch1.close() + + val v2 = ch2.tryRecv() + val v3 = ch2.tryRecv() + + assert(v1.isEmpty) + assert(v2.contains(1)) + assert(v3.contains(3)) + } + "values available, but don't match" in { + val ch1 = Channel.make[Int](3) + + val ch2 = ch1.filter(v => v == 3) + + ch1.send(1) + ch1.send(2) + + val v1 = ch2.tryRecv() + ch1.close() + + assert(v1.isEmpty) + } + "values available, but after non-matching ones" in { + val ch1 = Channel.make[Int](3) + + val ch2 = ch1.filter(v => v == 3) + + ch1.send(1) + ch1.send(2) + ch1.send(3) + + val v1 = ch2.tryRecv() + ch1.close() + + assert(v1.contains(3)) + } + } + + "filter input channel on tryRecv(duration)" when { + val timeout = Duration(200, MILLISECONDS) + "values either available or not" in { + val ch1 = Channel.make[Int](3) + + val ch2 = ch1.filter(v => v != 2) + + val v1 = ch2.tryRecv(timeout) + + ch1.send(1) + ch1.send(2) + ch1.send(3) + ch1.close() + + val v2 = ch2.tryRecv(timeout) + val v3 = ch2.tryRecv(timeout) + + assert(v1.isEmpty) + assert(v2.contains(1)) + assert(v3.contains(3)) + } + "values available, but don't match" in { + val ch1 = Channel.make[Int](3) + + val ch2 = ch1.filter(v => v == 3) + + ch1.send(1) + ch1.send(2) + + val v1 = ch2.tryRecv(timeout) + ch1.close() + + assert(v1.isEmpty) + } + "values available, but after non-matching ones" in { + val ch1 = Channel.make[Int](3) + + val ch2 = ch1.filter(v => v == 3) + + ch1.send(1) + ch1.send(2) + ch1.send(3) + + val v1 = ch2.tryRecv(timeout) + ch1.close() + + assert(v1.contains(3)) + } + "filter the correct value even with 0 millisecond timeout" in { + val ch1 = Channel.make[Int](2) + + val ch2 = ch1.filter(v => v == 2) + + ch1.send(1) + ch1.send(2) + + val v1 = ch2.tryRecv(Duration.Zero) + ch1.close() + + assert(v1.contains(2)) + } + + "return None if no values match and zero timeout" in { + val ch1 = Channel.make[Int](2) + + val ch2 = ch1.filter(v => v == 3) + + ch1.send(1) + ch1.send(2) + + val v1 = ch2.tryRecv(Duration.Zero) + ch1.close() + + assert(v1.isEmpty) + } + + "return instantly on empty channel and zero timeout" in { + val ch1 = Channel.make[Int](2) + + val ch2 = ch1.filter(v => v == 3) + + val start = Instant.now() + val v1 = ch2.tryRecv(Duration.Zero) + val finish = Instant.now() + + assert(v1.isEmpty) + assert(java.time.Duration.between(start, finish).toMillis <= 10L) + } + + "return None after proper wait for a non-zero timeout" in { + val ch1 = Channel.make[Int](2) + + val ch2 = ch1.filter(v => v == 3) + + ch1.send(1) + ch1.send(2) + + val start = Instant.now() + val v1 = ch2.tryRecv(Duration(10, MILLISECONDS)) + val finish = Instant.now() + + assert(v1.isEmpty) + assert(java.time.Duration.between(start, finish).toMillis >= 10L) + } + } + + "filter input channel on recver()" in { + val ch1 = Channel.make[Int](3) + + val ch2 = ch1.filter(v => v != 2) + + ch1.send(1) + ch1.send(2) + ch1.send(3) + ch1.close() + + var v1 = 0 + Channel.select(ch2.recver { v => v1 = v }) + Channel.select(ch2.recver { v => v1 = v }) + + var v2 = 0 + Channel.select(ch2.recver { v => v2 = v }) + + assert(v1 == 1) + assert(v2 == 3) + } + + "filter input channel on fornew()" in { + val ch1 = Channel.make[Int](3) + + val ch2 = ch1.filter(v => v != 2) + + ch1.send(1) + ch1.send(2) + ch1.send(3) + + var v1 = 0 + + ch2.fornew(v => v1 = v) + + assert(v1 == 3) + } + + "filter input channel on foreach()" in { + val ch1 = Channel.make[Int](3) + + val ch2 = ch1.filter(v => v != 3) + + ch1.send(1) + ch1.send(2) + ch1.send(3) + ch1.close() + + var v1 = 0 + + ch2.foreach(v => v1 = v) + + assert(v1 == 2) + } + + "filter input channel on filter(filter())" in { + val ch1 = Channel.make[Int](5) + + val ch2 = ch1.filter(v => v != 3) + val ch3 = ch2.filter(v => v != 4) + + ch1.send(1) + ch1.send(3) + ch1.send(4) + ch1.send(2) + ch1.send(3) + ch1.close() + + val v1 = ch3.recv() + val v2 = ch3.recv() + + assert(v1 == 1) + assert(v2 == 2) + } + + "filter input channel on filter(map())" in { + val ch1 = Channel.make[Int](3) + + val ch2 = ch1.map(v => v * 2) + val ch3 = ch2.filter(v => v != 4) + + ch1.send(1) + ch1.send(2) + ch1.send(3) + ch1.close() + + val v1 = ch3.recv() + val v2 = ch3.recv() + + assert(v1 == 2) + assert(v2 == 6) + } + + } + +} diff --git a/src/test/scala/com/github/yruslan/channel/ChannelMapSuite.scala b/src/test/scala/com/github/yruslan/channel/ChannelMapSuite.scala new file mode 100644 index 0000000..3873285 --- /dev/null +++ b/src/test/scala/com/github/yruslan/channel/ChannelMapSuite.scala @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +import org.scalatest.wordspec.AnyWordSpec + +import java.util.concurrent.Executors +import scala.concurrent._ +import scala.concurrent.duration.{Duration, MILLISECONDS} + +// This import is required for Scala 2.13 since it has a builtin Channel object. +import com.github.yruslan.channel.Channel + +class ChannelMapSuite extends AnyWordSpec { + implicit private val ec: ExecutionContextExecutor = + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) + + "Channel.map()" should { + "map output with recv()" in { + val ch1 = Channel.make[Int](2) + + val ch2 = ch1.map(v => v.toString) + + ch1.send(1) + ch1.send(2) + ch1.close() + + val s1 = ch2.recv() + val s2 = ch2.recv() + + assert(s1 == "1") + assert(s2 == "2") + } + + "map output with tryRecv()" in { + val ch1 = Channel.make[Int](2) + + val ch2 = ch1.map(v => v.toString) + + val s1 = ch2.tryRecv() + + ch1.send(1) + ch1.close() + + val s2 = ch2.tryRecv() + + assert(s1.isEmpty) + assert(s2.contains("1")) + } + + "map output with tryRecv(duration)" in { + val ch1 = Channel.make[Int](2) + + val ch2 = ch1.map(v => v.toString) + + val s1 = ch2.tryRecv(Duration(2, MILLISECONDS)) + + ch1.send(1) + ch1.close() + + val s2 = ch2.tryRecv(Duration(2, MILLISECONDS)) + + assert(s1.isEmpty) + assert(s2.contains("1")) + } + + "map output with recver()" in { + val ch1 = Channel.make[Int](2) + + val ch2 = ch1.map(v => v.toString) + + ch1.send(1) + ch1.close() + + var v1 = "" + Channel.select(ch2.recver{v => v1 = v}) + + assert(v1 == "1") + } + + "map output with fornew()" in { + val ch1 = Channel.make[Int](2) + + val ch2 = ch1.map(v => v.toString) + + ch1.send(1) + + var v1 = "" + + ch2.fornew(v => v1 = v) + + assert(v1 == "1") + } + + "map output with foreach()" in { + val ch1 = Channel.make[Int](2) + + val ch2 = ch1.map(v => v.toString) + + ch1.send(1) + ch1.close() + + var v1 = "" + + ch2.foreach(v => v1 = v) + + assert(v1 == "1") + } + + "map output with map(map())" in { + val ch1 = Channel.make[Int](2) + + val ch2 = ch1.map(v => v.toString) + + val ch3 = ch2.map(v => v.toInt + 1) + + ch1.send(1) + ch1.send(2) + ch1.close() + + val v1 = ch3.recv() + val v2 = ch3.recv() + + assert(v1 == 2) + assert(v2 == 3) + } + + } + +} diff --git a/src/test/scala/com/github/yruslan/channel/ChannelSuite.scala b/src/test/scala/com/github/yruslan/channel/ChannelSuite.scala new file mode 100644 index 0000000..c9fdbe4 --- /dev/null +++ b/src/test/scala/com/github/yruslan/channel/ChannelSuite.scala @@ -0,0 +1,1298 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +import com.github.yruslan.channel.Channel.select +import com.github.yruslan.channel.mocks.AsyncChannelSpy +import org.scalatest.BeforeAndAfterAll +import org.scalatest.wordspec.AnyWordSpec + +import java.time.Instant +import java.util.concurrent.{Executors, TimeUnit} +import scala.collection.mutable.ListBuffer +import scala.concurrent._ +import scala.concurrent.duration.{Duration, SECONDS} +import TestUtils._ + +// This import is required for Scala 2.13 since it has a builtin Channel object. +import com.github.yruslan.channel.Channel + +class ChannelSuite extends AnyWordSpec with BeforeAndAfterAll { + implicit private var ec: ExecutionContextExecutor = _ + + private val ex = Executors.newFixedThreadPool(16) + + override def beforeAll(): Unit = { + super.beforeAll() + ec = ExecutionContext.fromExecutor(ex) + } + + override def afterAll(): Unit = { + ex.shutdown() + super.afterAll() + } + + "send() and recv()" should { + "work for asynchronous channels in a single threaded setup" in { + val ch1 = Channel.make[Int](1) + val ch2 = ch1 + + ch1.send(10) + val v = ch2.recv() + + assert(v == 10) + } + + "async sent messages should arrive in FIFO order" in { + val ch1 = Channel.make[Int](5) + val ch2 = ch1 + + ch1.send(1) + ch1.send(2) + ch1.send(3) + + val v1 = ch2.recv() + + ch1.send(4) + + val v2 = ch2.recv() + val v3 = ch2.recv() + val v4 = ch2.recv() + + assert(v1 == 1) + assert(v2 == 2) + assert(v3 == 3) + assert(v4 == 4) + } + + "closed channel can still be used to receive pending messages" in { + val ch = Channel.make[Int](5) + + ch.send(1) + ch.send(2) + ch.send(3) + + val v1 = ch.recv() + ch.close() + val v2 = ch.recv() + val v3 = ch.recv() + + assert(v1 == 1) + assert(v2 == 2) + assert(v3 == 3) + } + + "closing a synchronous channel should block if the pending message is not received" in { + val ch = Channel.make[Int] + + val f = Future { + Thread.sleep(10L) + ch.close() + } + + ch.send(1) + + intercept[TimeoutException] { + Await.result(f, Duration.create(50, TimeUnit.MILLISECONDS)) + } + } + + "closing a synchronous channel should block until the pending message is not received" in { + val start = Instant.now() + val ch = Channel.make[Int](0) + var v: Option[Int] = None + + Future { + Thread.sleep(120L) + v = Option(ch.recv()) + } + + Future { + ch.send(1) + } + + val f2 = Future { + Thread.sleep(50L) + ch.close() + } + + Await.result(f2, Duration.create(2, TimeUnit.SECONDS)) + val finish = Instant.now() + + assert(v.nonEmpty) + assert(v.contains(1)) + assert(java.time.Duration.between(start, finish).toMillis >= 60L) + assert(java.time.Duration.between(start, finish).toMillis < 2000L) + } + + "reading a closed channel should thrown an exception" in { + val ch = Channel.make[Int](5) + + ch.send(1) + + val v1 = ch.recv() + ch.close() + + assert(v1 == 1) + + val ex = intercept[IllegalStateException] { + ch.recv() + } + + assert(ex.getMessage.contains("Attempt to receive from a closed channel")) + + val v4 = ch.tryRecv() + assert(v4.isEmpty) + } + + "can't send to a closed channel" in { + val ch = Channel.make[Int](2) + + val ok = ch.trySend(1) + assert(ok) + + ch.close() + + val ex = intercept[IllegalStateException] { + ch.send(2) + } + + assert(ex.getMessage.contains("Attempt to send to a closed channel")) + + val v1 = ch.tryRecv() + val v2 = ch.tryRecv() + + assert(v1.contains(1)) + assert(v2.isEmpty) + } + + "sync send/recv should block" in { + val start = Instant.now + val ch = Channel.make[Int] + + val f = Future { + Thread.sleep(100) + ch.recv() + } + + ch.send(100) + + val results = Await.result(f, Duration.apply(2, SECONDS)) + + val finish = Instant.now + + assert(results == 100) + assert(java.time.Duration.between(start, finish).toMillis >= 100L) + assert(java.time.Duration.between(start, finish).toMillis < 2000L) + } + + "unbounded channels should non-block" in { + val ch = Channel.makeUnbounded[Int] + + Range(0, 10000).foreach { i => + ch.send(i) + } + + ch.close() + + val v1 = ch.recv() + + Range(0, 9998).foreach { i => + ch.recv() + } + + val v2 = ch.recv() + + assert(v1 == 0) + assert(v2 == 9999) + } + + "send/foreach should handle interrupted thread" in { + val ch = new AsyncChannelSpy[Int](1) + + val output = new ListBuffer[Int] + + val t1 = createThread { + ch.foreach(a => + ch.synchronized { + output += a + } + ) + } + + val t2 = createThread { + ch.foreach(a => + ch.synchronized { + output += a + } + ) + } + + setUncaughtExceptionHandler(t1) ((_, _) => {}) + + t1.start() + t2.start() + + ch.send(100) + ch.send(200) + ch.send(300) + + t1.interrupt() + + ch.send(400) + ch.send(500) + ch.send(600) + ch.send(700) + ch.close() + + t2.join(2000) + t1.join(2000) + + assert(output.sorted.toList == List(100, 200, 300, 400, 500, 600, 700)) + assert(ch.numOfReaders == 0) + assert(ch.numOfWriters == 0) + } + + "send/recv should handle interrupted thread" in { + val ch = new AsyncChannelSpy[Int](1) + + val output = new ListBuffer[Int] + + val t1 = createThread { + ch.send(100) + ch.send(200) + ch.send(300) + } + + setUncaughtExceptionHandler(t1) ((_, _) => {}) + + val t2 = createThread { + ch.send(400) + Thread.sleep(30) + ch.send(500) + ch.send(600) + ch.send(700) + ch.close() + } + + t1.start() + t2.start() + + output += ch.recv() + + t1.interrupt() + + ch.foreach(v => output += v) + + t2.join(2000) + t1.join(2000) + + assert(output.contains(400)) + assert(output.contains(500)) + assert(output.contains(600)) + assert(output.contains(700)) + assert(ch.numOfReaders == 0) + assert(ch.numOfWriters == 0) + } + } + + "trySend() for sync channels" should { + "handle non-blocking way" when { + "data is available" in { + val ch = Channel.make[String] + + Future { + ch.recv() + } + + Thread.sleep(30) + val ok = ch.trySend("test", Duration.Zero) + + assert(ok) + } + + "data is not available" in { + val ch = Channel.make[String] + + val ok = ch.trySend("test2", Duration.Zero) + + assert(!ok) + } + + "return false if the channel is closed" in { + val ch = Channel.make[Int] + ch.close() + + val ok = ch.trySend(2) + + assert(!ok) + } + } + + "handle finite timeouts" when { + "timeout is not expired" in { + val ch = Channel.make[String] + + val t = createThread { + Thread.sleep(10) + ch.recv() + } + + t.start() + + val ok = ch.trySend("test", Duration.create(200, TimeUnit.MILLISECONDS)) + t.join() + + assert(ok) + } + + "timeout is expired" in { + val ch = Channel.make[String] + + val f = Future { + Thread.sleep(100) + ch.recv() + } + + val ok = ch.trySend("test2", Duration.create(10, TimeUnit.MILLISECONDS)) + ch.send("test1") + + Await.result(f, Duration.apply(2, SECONDS)) + + assert(!ok) + } + } + + "handle infinite timeouts" when { + "data is ready" in { + val ch = Channel.make[String] + + val f = Future { + Thread.sleep(10) + ch.recv() + } + + val ok = ch.trySend("test", Duration.Inf) + Await.result(f, Duration.apply(2, SECONDS)) + + assert(ok) + } + + "data is not ready" in { + val ch = Channel.make[String] + + val f = Future { + ch.trySend("test2", Duration.Inf) + } + + intercept[TimeoutException] { + Await.result(f, Duration.create(50, TimeUnit.MILLISECONDS)) + } + } + } + } + + "trySend() for async channels" should { + "handle non-blocking way" when { + "data is available" in { + val ch = Channel.make[String](1) + + val ok = ch.trySend("test", Duration.Zero) + + assert(ok) + } + + "data is not available" in { + val ch = Channel.make[String](1) + + val ok1 = ch.trySend("test1", Duration.Zero) + val ok2 = ch.trySend("test2", Duration.Zero) + + assert(ok1) + assert(!ok2) + } + + "return false if the channel is closed" in { + val ch = Channel.make[Int](1) + ch.close() + + val ok = ch.trySend(2) + + assert(!ok) + } + } + + "handle finite timeouts" when { + "timeout is not expired" in { + val ch = Channel.make[String](1) + + val t = createThread { + Thread.sleep(10) + ch.recv() + } + + t.start() + + ch.trySend("test1", Duration.Zero) + val ok = ch.trySend("test", Duration.create(1000, TimeUnit.MILLISECONDS)) + + t.join() + + assert(ok) + } + + "timeout is expired" in { + val ch = Channel.make[String](1) + + val f = Future { + Thread.sleep(100) + ch.recv() + } + + ch.trySend("test1", Duration.Zero) + val ok = ch.trySend("test2", Duration.create(10, TimeUnit.MILLISECONDS)) + + Await.result(f, Duration.apply(2, SECONDS)) + + assert(!ok) + } + } + + "handle infinite timeouts" when { + "data is ready" in { + val ch = Channel.make[String](1) + + val f = Future { + Thread.sleep(10) + ch.recv() + } + + ch.trySend("test1", Duration.Zero) + val ok = ch.trySend("test", Duration.Inf) + Await.result(f, Duration.apply(2, SECONDS)) + + assert(ok) + } + + "data is not ready" in { + val ch = Channel.make[String](1) + + val f = Future { + ch.trySend("test1", Duration.Zero) + ch.trySend("test2", Duration.Inf) + } + + intercept[TimeoutException] { + Await.ready(f, Duration.create(50, TimeUnit.MILLISECONDS)) + } + } + } + } + + "tryRecv() for sync channels" should { + "handle non-blocking way" when { + "data is available" in { + val ch = Channel.make[String] + val wg = WaitGroup() + + wg.add(1) + val t = createThread { + wg.done() + ch.send("test") + } + t.start() + wg.await() + Thread.sleep(100) + val v = ch.tryRecv(Duration.Zero) + t.join() + + assert(v.nonEmpty) + assert(v.contains("test")) + } + + "data is not available" in { + val ch = Channel.make[String] + + val v = ch.tryRecv(Duration.Zero) + + assert(v.isEmpty) + } + } + + "handle finite timeouts" when { + "timeout is not expired" in { + val ch = Channel.make[String] + val wg = WaitGroup() + wg.add() + + val f = Future { + wg.done() + ch.send("test") + } + wg.await() + + val v = ch.tryRecv(Duration.create(200, TimeUnit.MILLISECONDS)) + Await.result(f, Duration.apply(2, SECONDS)) + + assert(v.isDefined) + assert(v.contains("test")) + } + + "timeout is expired" in { + val ch = Channel.make[String] + + val start = Instant.now() + val v = ch.tryRecv(Duration.create(10, TimeUnit.MILLISECONDS)) + val finish = Instant.now() + + assert(v.isEmpty) + assert(java.time.Duration.between(start, finish).toMillis >= 10L) + } + } + + "handle infinite timeouts" when { + "data is ready" in { + val ch = Channel.make[String] + + val f = Future { + Thread.sleep(10) + ch.trySend("test", Duration.Zero) + } + + val start = Instant.now() + val v = ch.tryRecv(Duration.Inf) + val finish = Instant.now() + + Await.result(f, Duration.apply(2, SECONDS)) + + assert(v.isDefined) + assert(v.contains("test")) + assert(java.time.Duration.between(start, finish).toMillis >= 10L) + assert(java.time.Duration.between(start, finish).toMillis < 2000L) + } + + "data is not ready" in { + val ch = Channel.make[String] + + val f = Future { + ch.tryRecv(Duration.Inf) + } + + intercept[TimeoutException] { + Await.ready(f, Duration.create(50, TimeUnit.MILLISECONDS)) + } + } + } + } + + "tryRecv() for async channels" should { + "handle non-blocking way" when { + "data is available" in { + val ch = Channel.make[String](1) + + ch.send("test") + val v = ch.tryRecv(Duration.Zero) + + assert(v.nonEmpty) + assert(v.contains("test")) + } + + "data is not available" in { + val ch = Channel.make[String](1) + + val v = ch.tryRecv(Duration.Zero) + + assert(v.isEmpty) + } + } + + "handle finite timeouts" when { + "timeout is not expired" in { + val ch = Channel.make[String](1) + + val f = Future { + Thread.sleep(10) + ch.send("test") + } + + val start = Instant.now() + val v = ch.tryRecv(Duration.create(200, TimeUnit.MILLISECONDS)) + Await.result(f, Duration.apply(2, SECONDS)) + val finish = Instant.now() + + assert(v.isDefined) + assert(v.contains("test")) + assert(java.time.Duration.between(start, finish).toMillis >= 10L) + assert(java.time.Duration.between(start, finish).toMillis < 200L) + } + + "timeout is expired" in { + val ch = Channel.make[String](1) + + val start = Instant.now() + val v = ch.tryRecv(Duration.create(10, TimeUnit.MILLISECONDS)) + val finish = Instant.now() + + assert(v.isEmpty) + assert(java.time.Duration.between(start, finish).toMillis >= 10L) + } + } + + "handle infinite timeouts" when { + "data is ready" in { + val ch = Channel.make[String](1) + + val f = Future { + Thread.sleep(10) + ch.send("test") + } + + val start = Instant.now() + val v = ch.tryRecv(Duration.Inf) + val finish = Instant.now() + + Await.result(f, Duration.apply(2, SECONDS)) + + assert(v.isDefined) + assert(v.contains("test")) + assert(java.time.Duration.between(start, finish).toMillis >= 10L) + assert(java.time.Duration.between(start, finish).toMillis < 2000L) + } + + "data is not ready" in { + val ch = Channel.make[String] + + val f = Future { + ch.tryRecv(Duration.Inf) + } + + intercept[TimeoutException] { + Await.ready(f, Duration.create(50, TimeUnit.MILLISECONDS)) + } + } + } + } + + "fornew()" should { + "handle synchronous channels" when { + "there is data" in { + val ch = Channel.make[String] + + val processed = ListBuffer[String]() + + Future { + ch.send("test") + } + + Thread.sleep(30) + ch.fornew(v => processed += v) + + assert(processed.nonEmpty) + assert(processed.size == 1) + assert(processed.head == "test") + } + + "there are no data" in { + val ch = Channel.make[String] + + val processed = ListBuffer[String]() + + ch.fornew(v => processed += v) + + assert(processed.isEmpty) + } + } + + "handle asynchronous channels" when { + "there is data" in { + val ch = Channel.make[String](3) + + val processed = ListBuffer[String]() + + ch.send("test1") + ch.send("test2") + ch.send("test3") + + ch.fornew(v => processed += v) + + assert(processed.nonEmpty) + assert(processed.size == 3) + assert(processed.head == "test1") + assert(processed(1) == "test2") + assert(processed(2) == "test3") + } + + "there are no data" in { + val ch = Channel.make[String](3) + + val processed = ListBuffer[String]() + + ch.send("test1") + ch.recv() + + ch.fornew(v => processed += v) + + assert(processed.isEmpty) + } + } + } + + "foreach()" should { + "iterate through all values of a synchronous channel" in { + val ch = Channel.make[Int] + val values = new ListBuffer[Int] + + val fut = Future { + ch.foreach(v => values.synchronized { + values += v + }) + } + + val start = Instant.now() + ch.send(1) + ch.send(2) + ch.send(3) + ch.close() + val finish = Instant.now() + + Await.ready(fut, Duration.create(2, TimeUnit.SECONDS)) + + assert(values.toList == 1 :: 2 :: 3 :: Nil) + assert(java.time.Duration.between(start, finish).toMillis < 2000L) + } + + "iterate through all values of a asynchronous channel" in { + val ch = Channel.make[Int](3) + ch.send(1) + ch.send(2) + ch.send(3) + ch.close() + + val values = new ListBuffer[Int] + ch.foreach(v => values += v) + assert(values.toList == 1 :: 2 :: 3 :: Nil) + } + } + + "select()" should { + "work with a single channel" in { + val channel = Channel.make[Int](2) + + val ok = Channel.select(channel.sender(1) {}) + val value = channel.tryRecv() + + assert(ok) + assert(value.contains(1)) + } + + "ping pong messages between 2 workers" in { + for (_ <- Range(0, 100)) { + val actions = new StringBuffer() + + /* Full qualified name 'com.github.yruslan.channel.Channel' is used here to make IntelliJ IDEA happy. */ + def worker(workerNum: Int, ch1: com.github.yruslan.channel.Channel[Int], ch2: com.github.yruslan.channel.Channel[Int]): Unit = { + for (i <- Range(0, 10)) { + val k = select( + ch1.recver(n => { + actions.append(s"R$workerNum$n-") + }), + ch2.sender(i) { + actions.append(s"S$workerNum$i-") + } + ) + if (!k) throw new IllegalArgumentException("Failing the worker") + } + } + + val channel1 = Channel.make[Int] + val channel2 = Channel.make[Int] + + val fut1 = Future { + worker(1, channel1, channel2) + } + + val fut2 = Future { + worker(2, channel2, channel1) + } + + Await.result(fut1, Duration.apply(4, SECONDS)) + Await.result(fut2, Duration.apply(4, SECONDS)) + + // 10 messages sent and received, by 2 workers + assert(actions.toString.length == 80) + } + } + + "work with two channels" in { + val channel1 = Channel.make[Int](1) + val channel2 = Channel.make[Int](1) + + channel1.send(1) + channel2.send(2) + + var value1 = 0 + var value2 = 0 + + val selected1 = Channel.select( + channel1.recver { v => value1 = v }, + channel2.recver { v => value1 = v } + ) + + val selected2 = Channel.select( + channel1.recver { v => value2 = v }, + channel2.recver { v => value2 = v }) + + assert(selected1) + assert(selected2) + assert(value1 == 1 || value1 == 2) + assert(value2 == 1 || value2 == 2) + assert(value1 != value2) + } + + "work with when sending and receiving in the same select statement" in { + val channel = Channel.make[Int] + + var value1 = 0 + var value2 = 0 + + val fut = Future { + channel.send(1) + value1 = channel.recv() + } + + val selected1 = Channel.select( + channel.sender(2) {}, + channel.recver { v => value2 = v } + ) + + val selected2 = Channel.select( + channel.sender(2) {}, + channel.recver { v => value2 = v } + ) + + Await.result(fut, Duration.apply(4, SECONDS)) + + assert(selected1) + assert(selected2) + assert(value1 == 2) + assert(value2 == 1) + } + } + + "select() for synchronous channels" should { + "select should not send if there is no receiver" in { + val channel = Channel.make[Int] + + val t1 = createThread{ + Channel.select( + channel.sender(1) {} + ) + + channel.recv() + fail("Should not execute here") + } + + t1.start() + t1.join(200) + assert(t1.isAlive) + t1.interrupt() + } + + "select should not send to the same thread" in { + val channel = Channel.make[Int] + + val t1 = createThread { + var output = 0 + Channel.select( + channel.sender(1) {}, + channel.recver(v => output = v) + ) + + fail("Should not execute here") + } + + t1.start() + t1.join(200) + assert(t1.isAlive) + t1.interrupt() + } + + "select should execute default selector if others are not available" in { + val channel = Channel.make[Int] + + var reached = false + + var output = 0 + Channel.select( + channel.sender(1) {}, + channel.recver(v => output = v), + Channel.default{ reached = true } + ) + + assert(reached) + } + + "select should throw an exception if more than one default section is encountered" in { + val channel = Channel.make[Int] + var reached = false + var output = 0 + + assertThrows[IllegalArgumentException] { + Channel.select( + channel.sender(1) {}, + channel.recver(v => output = v), + Channel.default { + reached = true + }, + Channel.default { + reached = true + } + ) + } + } + + "ping pong between 2 workers with single channel" in { + for (_ <- Range(0, 100)) { + val actions = new StringBuffer() + + /* Full qualified name 'com.github.yruslan.channel.Channel' is used here to make IntelliJ IDEA happy. */ + def worker(workerNum: Int, ch: com.github.yruslan.channel.Channel[Int]): Unit = { + for (i <- Range(0, 10)) { + val k = select( + ch.recver(n => { + actions.append(s"R$workerNum$n-") + }), + ch.sender(i) { + actions.append(s"S$workerNum$i-") + } + ) + if (!k) throw new IllegalArgumentException("Failing the worker") + } + } + + val channel = Channel.make[Int] + + val fut1 = Future { + worker(1, channel) + } + + val fut2 = Future { + worker(2, channel) + } + + Await.result(fut1, Duration.apply(4, SECONDS)) + Await.result(fut2, Duration.apply(4, SECONDS)) + + // 10 messages sent and received, by 2 workers + assert(actions.toString.length == 80) + } + } + } + + "trySelect()" should { + "handle finite timeouts" when { + "timeout is not expired" in { + val channel = Channel.make[Int](1) + + Future { + Thread.sleep(1) + channel.send(1) + } + + var value1 = 0 + val selected = Channel.trySelect(Duration.create(200, TimeUnit.MILLISECONDS), + channel.recver(v => value1 = v) + ) + + assert(selected) + assert(value1 == 1) + } + + "timeout is expired" in { + val channel = Channel.make[Int](1) + + Future { + Thread.sleep(50) + channel.send(1) + } + + var value1 = 0 + val selected = Channel.trySelect(Duration.create(1, TimeUnit.MILLISECONDS), + channel.recver(v => value1 = v) + ) + + assert(!selected) + assert(value1 == 0) + } + } + + "handle zero timeouts" when { + "when data is available" in { + val channel = Channel.make[Int](1) + + channel.send(1) + + var value1 = 0 + val selected = Channel.trySelect(Duration.Zero, + channel.recver(v => value1 = v) + ) + + assert(selected) + assert(value1 == 1) + } + + "when data is not available" in { + val channel = Channel.make[Int](1) + + var value1 = 0 + val selected = Channel.trySelect(Duration.Zero, + channel.recver(v => value1 = v) + ) + + assert(!selected) + assert(value1 == 0) + } + } + + "handle infinite timeouts" when { + "when data is available" in { + val channel = Channel.make[Int](1) + + channel.send(1) + + var value1 = 0 + val selected = Channel.trySelect(Duration.Inf, + channel.recver(v => value1 = v) + ) + + assert(selected) + assert(value1 == 1) + } + + "when data is not available" in { + val channel = Channel.make[Int](1) + + val fut = Future { + Channel.trySelect(Duration.Inf, + channel.recver(v => {})) + } + + intercept[TimeoutException] { + Await.ready(fut, Duration.create(50, TimeUnit.MILLISECONDS)) + } + } + } + } + + "toList" should { + "covert a channel to a list" in { + val ch1 = Channel.make[Int](3) + + ch1.send(1) + ch1.send(2) + ch1.send(3) + ch1.close() + + val lst = ch1.toList + + assert(lst == List(1, 2, 3)) + } + + "convert a mapped filtered channel to a list" in { + val ch1 = Channel.make[Int](3) + + val ch2 = ch1 + .map(v => v * 2) + .filter(v => v != 4) + + ch1.send(1) + ch1.send(2) + ch1.send(3) + ch1.close() + + val lst = ch2.toList + + assert(lst == List(2, 6)) + } + } + "for comprehension" should { + "test for comprehension with yield" in { + val ch1 = Channel.make[Int](3) + + val ch2 = ch1 + .map(v => v * 2) + .filter(v => v != 4) + + ch1.send(1) + ch1.send(2) + ch1.send(3) + ch1.close() + + val outputChannel = for { + a <- ch2 + if a > 5 + } yield a + + assert(outputChannel.recv() == 6) + } + + "test for comprehension with foreach" in { + val ch1 = Channel.make[Int](3) + + val ch2 = ch1 + .map(v => v * 2) + .filter(v => v != 4) + + ch1.send(1) + ch1.send(2) + ch1.send(3) + ch1.close() + + var out = 0 + for { + a <- ch2 + if a > 5 + } out = a + + assert(out == 6) + } + + "test for comprehension with 2 channels" in { + val ch1 = Channel.make[Int](3) + val ch2 = Channel.make[Int](3) + + val ch3 = ch1 + .map(v => v * 2) + .filter(v => v != 4) + + ch1.send(1) + ch1.send(2) + ch1.send(3) + ch1.close() + + ch2.send(100) + ch2.send(200) + ch2.send(300) + ch2.close() + + var out = 0 + for { + a <- ch2 + b <- ch3 + if a > 5 + if b > 5 + } out = a + b + + assert(out == 106) + } + } + + "master/worker model" should { + // Worker that operates on 2 channels + def worker2(workerNum: Int, + results: ListBuffer[String], + channell: ReadChannel[Int], + channel2: ReadChannel[String]): Unit = { + while (!channell.isClosed && !channel2.isClosed) { + select( + channell.recver { v => + results.synchronized { + results += s"$workerNum->i$v" + } + }, + channel2.recver { v => + results.synchronized { + results += s"$workerNum->s$v" + } + }) + Thread.sleep(10) + } + } + + "work with one thread and two channels" in { + val channell = Channel.make[Int] + val channel2 = Channel.make[String] + val results = new ListBuffer[String] + + val worked1Fut = Future { + Thread.sleep(20) + worker2(1, results, channell, channel2) + } + + channell.send(1) + channel2.send("A") + channell.send(2) + channel2.send("B") + channell.send(3) + channel2.send("C") + + channell.close() + channel2.close() + + Await.result(worked1Fut, Duration.apply(4, SECONDS)) + + assert(results.size == 6) + } + + "work with two threads and two channels" in { + val channell = Channel.make[Int] + val channel2 = Channel.make[String] + val results = new ListBuffer[String] + + val worked1Fut = Future { + Thread.sleep(20) + worker2(1, results, channell, channel2) + } + + val worked2Fut = Future { + Thread.sleep(30) + worker2(2, results, channell, channel2) + } + + channell.send(1) + channel2.send("A") + channell.send(2) + channel2.send("B") + channell.send(3) + channel2.send("C") + + Thread.sleep(300) + channell.close() + channel2.close() + + Await.result(worked1Fut, Duration.apply(4, SECONDS)) + Await.result(worked2Fut, Duration.apply(4, SECONDS)) + + assert(results.size == 6) + } + } + + "channel casting" should { + "support covariance" in { + val ch1 = Channel.make[String](1) + val ch2: ReadChannel[Any] = ch1 + ch1.send("hello") + assert(ch2.recv() == "hello") + } + + "support contravariance" in { + val ch1 = Channel.make[Any](1) + val ch2: WriteChannel[String] = ch1 + ch2.send("hello") + assert(ch1.recv() == "hello") + } + } +} diff --git a/src/test/scala/com/github/yruslan/channel/GuaranteesSuite.scala b/src/test/scala/com/github/yruslan/channel/GuaranteesSuite.scala new file mode 100644 index 0000000..dfbfbc1 --- /dev/null +++ b/src/test/scala/com/github/yruslan/channel/GuaranteesSuite.scala @@ -0,0 +1,353 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +import java.time.Instant +import java.util.concurrent.{ConcurrentLinkedQueue, Executors, TimeUnit} +import org.scalatest.wordspec.AnyWordSpec +import com.github.yruslan.channel.Channel.{select, trySelect} +import com.github.yruslan.channel.TestUtils.createThread + +import scala.collection.JavaConverters.iterableAsScalaIterableConverter +import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import scala.concurrent._ +import scala.concurrent.duration.Duration +import scala.util.Random + +// This import is required for Scala 2.13 since it has a builtin Channel object. +import com.github.yruslan.channel.Channel + +class GuaranteesSuite extends AnyWordSpec { + implicit private val ec: ExecutionContextExecutor = + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) + + "Progress guarantees are provided" when { + "one channel always has messages to process" in { + // Here we test that when + // * There a worker processing messages from 2 channels and. + // * Processing a message takes more time than a new message to arrive in each channel. + // Then + // * Messages from both channels have a chance to be processed, none of the channels + // gets priority. + val ch1 = Channel.make[Int](20) + val ch2 = Channel.make[Int](20) + + val processed1Times = new ListBuffer[Instant] + val processed2Times = new ListBuffer[Instant] + + val fut = Future { + while (!ch1.isClosed && !ch2.isClosed) + select( + ch1.recver { _ => + processed1Times += Instant.now() + Thread.sleep(30) + }, + ch2.recver { _ => + processed2Times += Instant.now() + Thread.sleep(30) + }) + } + + for (_ <- Range(0, 20)) { + ch1.send(1) + ch2.send(2) + Thread.sleep(20) + } + ch1.close() + ch2.close() + + Await.result(fut, Duration.create(2, TimeUnit.SECONDS)) + + // At least one message from ch1 and ch2 should be processed + assert(processed1Times.nonEmpty) + assert(processed2Times.nonEmpty) + } + } + + "Fairness is provided" when { + "several sync input sync output channels are active, a channel is selected randomly" in { + val in1 = Channel.make[Int] + val in2 = Channel.make[Int] + + val out1 = Channel.make[Int] + val out2 = Channel.make[Int] + + testFairness(in1, in2, out1, out2) + } + + "several sync input async output channels are active, a channel is selected randomly" in { + val in1 = Channel.make[Int] + val in2 = Channel.make[Int] + + val out1 = Channel.make[Int](1) + val out2 = Channel.make[Int](1) + + testFairness(in1, in2, out1, out2) + } + + "several async input sync output channels are active, a channel is selected randomly" in { + val in1 = Channel.make[Int](1) + val in2 = Channel.make[Int](1) + + val out1 = Channel.make[Int] + val out2 = Channel.make[Int] + + testFairness(in1, in2, out1, out2) + } + + "several async input async output channels are active, a channel is selected randomly" in { + val in1 = Channel.make[Int](1) + val in2 = Channel.make[Int](1) + + val out1 = Channel.make[Int](1) + val out2 = Channel.make[Int](1) + + testFairness(in1, in2, out1, out2) + } + } + + "Priority is honored for priority selects" when { + "several sync input sync output channels are active, a channel is selected according to priority" in { + val in1 = Channel.make[Int] + val in2 = Channel.make[Int] + + val out1 = Channel.make[Int] + val out2 = Channel.make[Int] + + testFairness(in1, in2, out1, out2, isPriority = true) + } + + "several sync input async output channels are active, a channel is selected according to priority" in { + val in1 = Channel.make[Int] + val in2 = Channel.make[Int] + + val out1 = Channel.make[Int](1) + val out2 = Channel.make[Int](1) + + testFairness(in1, in2, out1, out2, isPriority = true) + } + + "several async input sync output channels are active, a channel is selected according to priority" in { + val in1 = Channel.make[Int](1) + val in2 = Channel.make[Int](1) + + val out1 = Channel.make[Int] + val out2 = Channel.make[Int] + + testFairness(in1, in2, out1, out2, isPriority = true) + } + + "several async input async output channels are active, a channel is selected according to priority" in { + val in1 = Channel.make[Int](1) + val in2 = Channel.make[Int](1) + + val out1 = Channel.make[Int](1) + val out2 = Channel.make[Int](1) + + testFairness(in1, in2, out1, out2, isPriority = true) + } + } + + "Measures" should { + "match the expected single consumer throughput and latency" in { + val N = 50000 + val n = 4 + val in = Channel.make[Int](n * 2) + val out = Channel.make[Int](n * 2) + + val start = System.nanoTime() + + val threads = Range(0, n).map { _ => + val t = createThread { + in.foreach { a => out.send(a) } + } + t.start() + t + } + + for (i <- 1 to N) { + in.send(i) + out.recv() + } + + in.close() + val finish = System.nanoTime() + + threads.foreach(_.join()) + + val throughput = N.toDouble / ((finish - start).toDouble / 1000000000.0) + val latency = ((finish - start).toDouble / 1000.0) / N.toDouble + + println(s"[info] - Single consumer throughput: ${throughput.round} rps, latency: ${latency.round} micro seconds") + + assert(throughput > 100) + assert(latency < 1000) + } + + "match the expected 4 producer/consumer throughput and latency" in { + val N = 50000 + val n = 4 + val in = Channel.make[Long](n * 2) + val out = Channel.make[(Long, Long)](n * 2) + val wg = WaitGroup() + + wg.add(N) + + val producers = Range(0, n).map { _ => + val t = createThread { + in.foreach { a => out.send((a, System.nanoTime())) } + } + t.start() + t + } + + val start = System.nanoTime() + + val durations = new ConcurrentLinkedQueue[Long]() + + val consumers = Range(0, n).map { _ => + val t = createThread { + out.foreach { case(start, end) => + durations.add(end - start) + wg.done() + } + } + t.start() + t + } + + for (_ <- 1 to N) { + in.send(System.nanoTime()) + } + in.close() + + wg.await() + + val finish = System.nanoTime() + + out.close() + producers.foreach(_.join()) + consumers.foreach(_.join()) + + val throughput = N.toDouble / ((finish - start).toDouble / 1000000000.0) + val durationArray = durations.asScala.toArray + val latency = durationArray.map(x => x.toDouble).sum / durationArray.length.toDouble / 2000.0 // chain of 2 send/recv + + println(s"[info] - 4 producer/consumer throughput: ${throughput.round} rps, latency: ${latency.round} micro seconds") + + assert(throughput > 100) + assert(latency < 1000) + } + } + + + /* Full qualified name 'com.github.yruslan.channel.Channel' is used here to make IntelliJ IDEA happy. */ + private def testFairness(in1: com.github.yruslan.channel.Channel[Int], + in2: com.github.yruslan.channel.Channel[Int], + out1: com.github.yruslan.channel.Channel[Int], + out2: com.github.yruslan.channel.Channel[Int], + isPriority: Boolean = false): Unit = { + val results = new ListBuffer[(Int, Int)] + + def balancer(input1: ReadChannel[Int], + input2: ReadChannel[Int], + output1: WriteChannel[Int], + output2: WriteChannel[Int], + finishChannel: ReadChannel[Boolean]): Unit = { + var v: Int = 0 + var exit = false + + while (!exit) { + trySelect( + Duration.Inf, + isPriority, + input1.recver(x => v = x), + input2.recver(x => v = x), + finishChannel.recver(_ => exit = true) + ) + + if (!exit) { + trySelect( + Duration.Inf, + isPriority, + output1.sender(v) {}, + output2.sender(v) {} + ) + } + } + } + + def worker(num: Int, input1: ReadChannel[Int]): Unit = { + input1.foreach(x => { + Thread.sleep(Random.nextInt(5) + 10) + results.synchronized { + results.append((num, 2 * x)) + } + }) + } + + val finish = Channel.make[Boolean] + + // Launching workers + val w = Range(0, 4).map(i => Future { + if (i % 2 == 0) + worker(i, out1) + else + worker(i, out2) + }) + + // Launching the load balancer + val bal = Future { + balancer(in1, in2, out1, out2, finish) + } + + // Sending out the work + Range(1, 101).foreach(i => { + if (i % 2 == 0) { + in1.send(i) + } else { + in2.send(i) + } + Thread.sleep(10) + }) + + // Letting the balancer and the worker threads that the processing is finished + finish.send(true) + out1.close() + out2.close() + + // Waiting for the futures to finish + w.foreach(f => Await.result(f, Duration.create(4, TimeUnit.SECONDS))) + Await.result(bal, Duration.create(4, TimeUnit.SECONDS)) + + // Correctness + assert(results.size == 100) + assert(results.map(_._2).sum == 10100) // sum(1..100)*2 = 101*50*2 = 5050*2 = 10100 + + val processedBy = Range(0, 4).map(w => results.count(_._1 == w)) + + if (isPriority) { + // Priority + assert(processedBy.min < 5) + assert(processedBy.max > 45) + } else { + // Fairness + assert(processedBy.min > 15) + assert(processedBy.max < 35) + } + } + +} diff --git a/src/test/scala/com/github/yruslan/channel/TestUtils.scala b/src/test/scala/com/github/yruslan/channel/TestUtils.scala new file mode 100644 index 0000000..6c85c06 --- /dev/null +++ b/src/test/scala/com/github/yruslan/channel/TestUtils.scala @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +import com.github.yruslan.channel.exception.NegativeWaitGroupCounter + +import java.lang.Thread.UncaughtExceptionHandler + +object TestUtils { + def createThread(action: => Unit): Thread = { + // Creating thread in the Scala 2.11 compatible way. + // Please do not remove 'new Runnable' + new Thread(new Runnable { + override def run(): Unit = action + }) + } + + def setUncaughtExceptionHandler(thread: Thread)(handler: (Thread, Throwable) => Unit): Unit = { + // Creating thread in the Scala 2.11 compatible way. + // Please do not remove 'new UncaughtExceptionHandler' + thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler { + def uncaughtException(t: Thread, e: Throwable): Unit = handler(t, e) + }) + } +} diff --git a/src/test/scala/com/github/yruslan/channel/TimeChannelsSuite.scala b/src/test/scala/com/github/yruslan/channel/TimeChannelsSuite.scala new file mode 100644 index 0000000..ab68683 --- /dev/null +++ b/src/test/scala/com/github/yruslan/channel/TimeChannelsSuite.scala @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel + +import org.scalatest.wordspec.AnyWordSpec + +import java.time.Instant +import java.util.concurrent.TimeUnit +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.Duration + +class TimeChannelsSuite extends AnyWordSpec { + "ticker" should { + "generate a ticker that ticks until closed" in { + val start = Instant.now() + + val ticker = TimeChannels.ticker(Duration(10, TimeUnit.MILLISECONDS)) + + ticker.recv() + val middle = Instant.now() + ticker.recv() + val finish = Instant.now() + ticker.close() + + assert(java.time.Duration.between(start, middle).toMillis >= 10L) + assert(java.time.Duration.between(start, middle).toMillis <= 500L) + assert(java.time.Duration.between(middle, finish).toMillis >= 10L) + assert(java.time.Duration.between(middle, finish).toMillis <= 500L) + } + + "generate a ticker that ticks until closed111" in { + val ticker = TimeChannels.ticker(Duration(10, TimeUnit.MILLISECONDS)) + + val start = Instant.now() + ticker.close() + val finish = Instant.now() + + assert(java.time.Duration.between(start, finish).toMillis < 10L) + } + } + + "after" should { + "generate a single shot read channel" in { + val start = Instant.now() + + val after = TimeChannels.after(Duration(10, TimeUnit.MILLISECONDS)) + + after.recv() + val finish = Instant.now() + + assert(java.time.Duration.between(start, finish).toMillis >= 10L) + assert(after.isClosed) + } + } +} diff --git a/src/test/scala/com/github/yruslan/channel/WaitGroupSuite.scala b/src/test/scala/com/github/yruslan/channel/WaitGroupSuite.scala new file mode 100644 index 0000000..9b191fe --- /dev/null +++ b/src/test/scala/com/github/yruslan/channel/WaitGroupSuite.scala @@ -0,0 +1,55 @@ +package com.github.yruslan.channel + +import com.github.yruslan.channel.exception.NegativeWaitGroupCounter +import org.scalatest.wordspec.AnyWordSpec +import TestUtils._ + +class WaitGroupSuite extends AnyWordSpec { + "WaitGroup" should { + "wait for all done" in { + val wg = WaitGroup() + val n = 10 + for (_ <- 1 to n) { + wg.add() + createThread { + Thread.sleep(1000) + wg.done() + }.start() + } + wg.await() + } + + "done()" should { + "throw an exception if the counter is negative" in { + val wg = WaitGroup() + wg.add() + wg.done() + assertThrows[NegativeWaitGroupCounter] { + wg.done() + } + } + } + + "wait()" should { + "throw an exception if the counter is negative" in { + val wg = WaitGroup() + wg.add(2) + + val t = createThread { + Thread.sleep(1000) + wg.add(-3) + } + + setUncaughtExceptionHandler(t) { (_, e) => + assert(e.isInstanceOf[NegativeWaitGroupCounter]) + } + + t.start() + assertThrows[NegativeWaitGroupCounter] { + wg.await() + } + t.join() + } + } + } +} diff --git a/src/test/scala/com/github/yruslan/channel/impl/SimpleLinkListSuite.scala b/src/test/scala/com/github/yruslan/channel/impl/SimpleLinkListSuite.scala new file mode 100644 index 0000000..5f72e38 --- /dev/null +++ b/src/test/scala/com/github/yruslan/channel/impl/SimpleLinkListSuite.scala @@ -0,0 +1,264 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel.impl + +import org.scalatest.wordspec.AnyWordSpec + +private class SimpleLinkListSuite extends AnyWordSpec { + "SimpleLinkedList" should { + "initialize as an empty list" in { + val lst = new SimpleLinkedList[Int] + + assert(lst.isEmpty) + assert(lst.size == 0) + } + } + + "append()" should { + "add a single element to the list" in { + val lst = new SimpleLinkedList[Int] + + lst.append(1) + + assert(!lst.isEmpty) + assert(lst.size == 1) + assert(lst.head == 1) + } + + "add several elements to the list" in { + val lst = new SimpleLinkedList[Int] + + lst.append(1) + lst.append(2) + lst.append(3) + + assert(!lst.isEmpty) + assert(lst.size == 3) + assert(lst.head == 1) + } + } + + "remove()" should { + "remove a single element to the list" in { + val lst = new SimpleLinkedList[Int] + + lst.append(1) + lst.remove(1) + + assert(lst.isEmpty) + assert(lst.size == 0) + } + + "remove an element in the middle of the list" in { + val lst = new SimpleLinkedList[Int] + + lst.append(1) + lst.append(2) + lst.append(3) + + lst.remove(1) + + assert(!lst.isEmpty) + assert(lst.size == 2) + assert(lst.head == 2) + } + + "remove an element in the end of the list" in { + val lst = new SimpleLinkedList[Int] + + lst.append(1) + lst.append(2) + lst.append(3) + + lst.remove(3) + + assert(!lst.isEmpty) + assert(lst.size == 2) + assert(lst.head == 1) + } + + "remove two elements from the list" in { + val lst = new SimpleLinkedList[Int] + + lst.append(1) + lst.append(2) + lst.append(3) + + lst.remove(3) + assert(lst.head == 1) + + lst.remove(1) + assert(lst.head == 2) + + assert(!lst.isEmpty) + assert(lst.size == 1) + } + + "remove 1ll elements from the list" in { + val lst = new SimpleLinkedList[Int] + + lst.append(1) + lst.append(2) + lst.append(3) + + lst.remove(2) + lst.remove(3) + lst.remove(1) + + assert(lst.isEmpty) + assert(lst.size == 0) + } + + "not remove element that does not exist in the list" in { + val lst = new SimpleLinkedList[Int] + + lst.append(1) + lst.append(2) + lst.append(3) + + lst.remove(4) + + assert(!lst.isEmpty) + assert(lst.size == 3) + } + + "do nothing on empty list" in { + val lst = new SimpleLinkedList[Int] + + lst.remove(1) + + assert(lst.isEmpty) + } + } + + "clear()" should { + "empty the list" in { + val lst = new SimpleLinkedList[Int] + + lst.append(1) + lst.append(2) + lst.append(3) + + lst.clear() + + assert(lst.isEmpty) + assert(lst.size == 0) + } + + } + + "foreach()" should { + "process nothing for an empty list" in { + val lst = new SimpleLinkedList[Int] + + var i = 0 + lst.foreach(_ => i += 1) + + assert(i == 0) + } + + "process all elements of the list" in { + val lst = new SimpleLinkedList[Int] + + lst.append(1) + lst.append(2) + lst.append(3) + + var i = 0 + lst.foreach(a => i += a) + + assert(i == 6) + } + } + + "head()" should { + "return head element if it is the only one in the list" in { + val lst = new SimpleLinkedList[Int] + + lst.append(1) + + assert(lst.head == 1) + assert(lst.head == 1) + + assert(!lst.isEmpty) + assert(lst.size == 1) + } + "return head element of the list" in { + val lst = new SimpleLinkedList[Int] + + lst.append(1) + lst.append(2) + lst.append(3) + + assert(lst.head == 1) + assert(lst.head == 1) + assert(lst.head == 1) + + assert(!lst.isEmpty) + assert(lst.size == 3) + } + + "throw an exception on empty list" in { + val lst = new SimpleLinkedList[Int] + + intercept[NoSuchElementException] { + lst.head + } + } + } + "returnHeadAndRotate()" should { + "return head element if it is the only one in the list" in { + val lst = new SimpleLinkedList[Int] + + lst.append(1) + + assert(lst.returnHeadAndRotate() == 1) + assert(lst.returnHeadAndRotate() == 1) + assert(lst.returnHeadAndRotate() == 1) + + assert(!lst.isEmpty) + assert(lst.size == 1) + } + "return head element and rotate the rest of the list" in { + val lst = new SimpleLinkedList[Int] + + lst.append(1) + lst.append(2) + lst.append(3) + + // "The definition of insanity is doing the same thing over and over again, but expecting different results." + // -- Albert Einstein + assert(lst.returnHeadAndRotate() == 1) + assert(lst.returnHeadAndRotate() == 2) + assert(lst.returnHeadAndRotate() == 3) + assert(lst.returnHeadAndRotate() == 1) + assert(lst.returnHeadAndRotate() == 2) + assert(lst.returnHeadAndRotate() == 3) + + assert(!lst.isEmpty) + assert(lst.size == 3) + } + } + + "throw an exception on empty list" in { + val lst = new SimpleLinkedList[Int] + + intercept[NoSuchElementException] { + lst.returnHeadAndRotate() + } + } + +} diff --git a/src/test/scala/com/github/yruslan/channel/mocks/AsyncChannelSpy.scala b/src/test/scala/com/github/yruslan/channel/mocks/AsyncChannelSpy.scala new file mode 100644 index 0000000..bba8a4c --- /dev/null +++ b/src/test/scala/com/github/yruslan/channel/mocks/AsyncChannelSpy.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2020 Ruslan Yushchenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.yruslan.channel.mocks + +import com.github.yruslan.channel.AsyncChannel + +class AsyncChannelSpy[T](maxCapacity: Int) extends AsyncChannel[T](maxCapacity) { + def numOfReaders: Int = readers + + def numOfWriters: Int = writers +} diff --git a/version.sbt b/version.sbt new file mode 100644 index 0000000..b5400cd --- /dev/null +++ b/version.sbt @@ -0,0 +1 @@ +ThisBuild / version := "0.2.2-SNAPSHOT"