Skip to content

Add Monix task utilities #543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.avsystem.commons
package concurrent

import monix.eval.Task
import monix.execution.Scheduler
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks

import scala.concurrent.TimeoutException
import scala.concurrent.duration._

class JvmTaskExtensionsTest extends AnyFunSuite with Matchers with ScalaCheckDrivenPropertyChecks with ScalaFutures {

import com.avsystem.commons.concurrent.TaskExtensions._

private implicit val scheduler: Scheduler = Scheduler.global

// This test does not work in SJS runtime (but the method itself does)
test("lazyTimeout") {
val result = Task.never.lazyTimeout(50.millis, "Lazy timeout").runToFuture.failed.futureValue
result shouldBe a[TimeoutException]
result.getMessage shouldBe "Lazy timeout"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ object ObservableExtensions extends ObservableExtensions {
*/
def headOptL: Task[Opt[T]] = obs.headOptionL.map(_.toOpt)

/**
* Returns a [[monix.eval.Task Task]] which emits the first <b>non-null</b> item for which the predicate holds.
*/
def findOptL(p: T => Boolean): Task[Opt[T]] = obs.findL(e => e != null && p(e)).map(_.toOpt)

/** Suppress the duplicate elements emitted by the source Observable.
*
* WARNING: this requires unbounded buffering.
Expand Down Expand Up @@ -79,5 +84,15 @@ object ObservableExtensions extends ObservableExtensions {
obs
.foldLeftL(factory.newBuilder)(_ += _)
.map(_.result())

/** Returns a [[monix.eval.Task Task]] that upon evaluation
* will collect all items from the source into a [[Map]] instance
* using provided functions to compute keys and values.
*
* WARNING: for infinite streams the process will eventually blow up
* with an out of memory error.
*/
def mkMapL[K, V](keyFun: T => K, valueFun: T => V): Task[Map[K, V]] =
obs.map(v => (keyFun(v), valueFun(v))).toL(Map)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.avsystem.commons
package concurrent

import com.avsystem.commons.concurrent.TaskExtensions.{TaskCompanionOps, TaskOps}
import com.avsystem.commons.misc.Timestamp
import monix.eval.Task
import monix.reactive.Observable

import java.util.concurrent.TimeUnit
import scala.concurrent.TimeoutException
import scala.concurrent.duration.FiniteDuration

trait TaskExtensions {
implicit def taskOps[T](task: Task[T]): TaskOps[T] = new TaskOps(task)

implicit def taskCompanionOps(task: Task.type): TaskCompanionOps.type = TaskCompanionOps
}

object TaskExtensions extends TaskExtensions {
final class TaskOps[T](private val task: Task[T]) extends AnyVal {
/**
* Similar to [[Task.timeoutWith]] but exception instance is created lazily (for performance)
*/
def lazyTimeout(after: FiniteDuration, msg: => String): Task[T] =
task.timeoutTo(after, Task.defer(Task.raiseError(new TimeoutException(msg))))

/**
* Similar to [[Task.tapEval]], accepts simple consumer function as an argument
*/
def tapL(f: T => Unit): Task[T] =
task.map(_.setup(f))

/**
* Similar to [[Task.tapError]], accepts [[PartialFunction]] as an argument
*/
def tapErrorL[B](f: PartialFunction[Throwable, B]): Task[T] =
task.tapError(t => Task(f.applyOpt(t)))
}

object TaskCompanionOps {
import com.avsystem.commons.concurrent.ObservableExtensions.observableOps

/** A [[Task]] of [[Opt.Empty]] */
def optEmpty[A]: Task[Opt[A]] = Task.pure(Opt.Empty)

def traverseOpt[A, B](opt: Opt[A])(f: A => Task[B]): Task[Opt[B]] =
opt.fold(Task.optEmpty[B])(a => f(a).map(_.opt))

def fromOpt[A](maybeTask: Opt[Task[A]]): Task[Opt[A]] = maybeTask match {
case Opt(task) => task.map(_.opt)
case Opt.Empty => Task.optEmpty
}

def traverseMap[K, V, A, B](map: Map[K, V])(f: (K, V) => Task[(A, B)]): Task[Map[A, B]] =
Observable.fromIterable(map).mapEval({ case (key, value) => f(key, value) }).toL(Map)

def traverseMapValues[K, A, B](map: Map[K, A])(f: (K, A) => Task[B]): Task[Map[K, B]] =
traverseMap(map)({ case (key, value) => f(key, value).map(key -> _) })

def currentTimestamp: Task[Timestamp] =
Task.clock.realTime(TimeUnit.MILLISECONDS).map(Timestamp(_))

def usingNow[T](useNow: Timestamp => Task[T]): Task[T] =
currentTimestamp.flatMap(useNow)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,29 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers
Observable.fromIterable(ints).headOptL.runToFuture.futureValue shouldBe ints.headOpt
}
}

test("headOptL - null handling") {
Observable.fromIterable(Seq(null, "abc", "xyz")) .headOptL.runToFuture.futureValue shouldBe Opt.Empty
}

test("findOptL") {
forAll { ints: List[Int] =>
Observable.fromIterable(ints).findOptL(_ > 1).runToFuture.futureValue shouldBe ints.findOpt(_ > 1)
}
}

test("findOptL - null handling") {
Observable.fromIterable(Seq(null, "abc", "xyz")).findOptL(_ => true).runToFuture.futureValue shouldBe Opt.some("abc")
Observable.fromIterable(Seq(null, null)).findOptL(_ => true).runToFuture.futureValue shouldBe Opt.Empty
Observable.fromIterable(Seq(null, "abc", "xyz")).findOptL(_.startsWith("x")).runToFuture.futureValue shouldBe Opt.some("xyz")
}

test("distinct") {
forAll { ints: List[Int] =>
Observable.fromIterable(ints).distinct.toListL.runToFuture.futureValue shouldBe ints.distinct
}
}

test("distinctBy") {
forAll { ints: List[Int] =>
val f: Int => Int = _ % 256
Expand All @@ -33,17 +51,20 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers
ints.foldLeft(MLinkedHashMap.empty[Int, Int])((map, v) => f(v) |> (key => map.applyIf(!_.contains(key))(_ += key -> v))).valuesIterator.toList
}
}

test("sortedL") {
forAll { ints: List[Int] =>
Observable.fromIterable(ints).sortedL.runToFuture.futureValue shouldBe ints.sorted
}
}

test("sortedByL") {
forAll { ints: List[Int] =>
val f: Int => Int = _ % 256
Observable.fromIterable(ints).sortedByL(f).runToFuture.futureValue shouldBe ints.sortBy(f)
}
}

test("toL") {
forAll { ints: List[(Int, Int)] =>
def testFactory[T](factory: Factory[(Int, Int), T])(implicit position: Position) =
Expand Down Expand Up @@ -78,4 +99,9 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers
}
}

test("mkMapL") {
forAll { ints: List[Int] =>
Observable.fromIterable(ints).mkMapL(_ % 3, _ + 2).runToFuture.futureValue shouldBe ints.mkMap(_ % 3, _ + 2)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.avsystem.commons
package concurrent

import monix.eval.Task
import monix.execution.Scheduler
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks

class TaskExtensionsTest extends AnyFunSuite with Matchers with ScalaCheckDrivenPropertyChecks with ScalaFutures {
import com.avsystem.commons.concurrent.TaskExtensions._

private implicit val scheduler: Scheduler = Scheduler.global

test("traverseOpt") {
Task.traverseOpt(Opt.empty[Int])(i => Task.now(i)).runToFuture.futureValue shouldBe Opt.Empty
Task.traverseOpt(Opt.some(123))(i => Task.now(i)).runToFuture.futureValue shouldBe Opt.some(123)
}

test("fromOpt") {
Task.fromOpt(Opt.empty[Task[Int]]).runToFuture.futureValue shouldBe Opt.Empty
Task.fromOpt(Opt.some(Task.now(123))).runToFuture.futureValue shouldBe Opt.some(123)
}

test("traverseMap") {
forAll { data: List[(String, Int)] =>
val map = data.toMap
val expected = map.view.map({ case (key, value) => (key + key, value + 2) }).toMap
val result = Task.traverseMap(map)({ case (key, value) => Task((key + key, value + 2)) }).runToFuture.futureValue
result shouldBe expected
}
}

test("traverseMapValues") {
forAll { data: List[(String, Int)] =>
val map = data.toMap
val expected = map.view.mapValues(value => value + 2).toMap
val result = Task.traverseMapValues(map)({ case (key, value) => Task(value + 2) }).runToFuture.futureValue
result shouldBe expected
}
}
}