Skip to content

Add FailFast policy. #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,27 @@ policy(execptionalAttempt)

Note, The domain of the PartialFunction passed to When may cover both the exception thrown _or_ the successful result of the future.

#### FailFast

`retry.FailFast` allows you to wrap any of the above policies and define which failures should immediately stop the retries.

The difference between `retry.FailFast` and `retry.When` with a partial function for `Throwable`s is that `retry.When`
passes the execution to another policy after the first retry, whereas `retry.FailFast` uses the inner policy logic
for each retry. For instance, it allows using a policy that retries forever together with a fail fast logic
on some irrecoverable exceptions.

```scala
val innerPolicy = retry.Backoff.forever
val policy = retry.FailFast(innerPolicy) {
case e: FooException => true
case e: RuntimeException => isFatal(e.getCause)
}

policy(issueRequest)
```

When the provided partial function is not defined at a particular `Throwable`, the retry logic is defined by the wrapped policy.

#### Suggested library usage

Since all retry modules now produce a generic interface, a `retry.Policy`, if you wish to write clients of services you may wish to make define
Expand Down
38 changes: 38 additions & 0 deletions retry/src/main/scala/Policy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import odelay.{Delay, Timer}

import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
//import scala.language.implicitConversions
import scala.util.control.NonFatal

Expand Down Expand Up @@ -236,6 +237,43 @@ object When {
}
}

/** A retry policy that wraps another policy and defines which failures immediately
* stop the retries.
*
* {{{
* val innerPolicy = retry.Backoff.forever
* val policy = retry.FailFast(innerPolicy) {
* case e: FooException => true
* case e: RuntimeException => isFatal(e.getCause)
* }
* val future = policy(issueRequest)
* }}}
*
* When the provided partial function is not defined at a particular throwable,
* the retry logic is defined by the wrapped policy.
*/
object FailFast {
def apply(policy: Policy)(failFastOn: PartialFunction[Throwable, Boolean]): Policy =
new Policy {
def apply[T](promise: PromiseWrapper[T])(
implicit success: Success[T],
executor: ExecutionContext): Future[T] = {
implicit val successWithFailFast = Success[Try[T]] {
case scala.util.Success(res) => success.predicate(res)
case scala.util.Failure(_) => true
}
policy.apply {
promise()
.map(scala.util.Success(_))
.recover {
case e: Throwable if failFastOn.lift(e).contains(true) =>
scala.util.Failure(e)
}
}.map(_.get)
}
}
}

/** Retry policy that incorporates a count */
trait CountingPolicy extends Policy {
protected def countdown[T](max: Int,
Expand Down
199 changes: 199 additions & 0 deletions retry/src/test/scala/PolicySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -418,4 +418,203 @@ abstract class PolicySpec extends AsyncFunSpec with BeforeAndAfterAll {
}
}
}

describe("retry.FailFast") {
it("should not retry on success") {
implicit val success = Success.always
val innerPolicy = Directly()
val counter = new AtomicInteger(0)
val future = FailFast(innerPolicy) {
case _ => false
} {
counter.incrementAndGet()
Future.successful("yay!")
}
future.map(result => assert(counter.get() === 1 && result === "yay!"))
}

it("should retry number of times specified in the inner policy") {
implicit val success = Success[Int](_ == 3)
val tries = forwardCountingFutureStream().iterator
val innerPolicy = Directly(3)
val future = FailFast(innerPolicy) {
case _ => false
} (tries.next)
future.map(result => assert(success.predicate(result) === true))
}

it("should fail when inner policy retries are exceeded") {
implicit val success = Success.always
val innerPolicy = Directly(3)
val counter = new AtomicInteger(0)
val future = FailFast(innerPolicy) {
case _ => false
} {
counter.incrementAndGet()
Future.failed(new RuntimeException("always failing"))
}
// expect failure after 1+3 tries
future.failed.map { t =>
assert(counter.get() === 4 && t.getMessage === "always failing")
}
}

it("should fail fast when predicate matches every throwable") {
implicit val success = Success.always
val innerPolicy = Directly.forever
val counter = new AtomicInteger(0)
val future = FailFast(innerPolicy) {
case _ => true
} {
counter.incrementAndGet()
Future.failed(new RuntimeException("always failing"))
}
future.failed.map { t =>
assert(counter.get() === 1 && t.getMessage === "always failing")
}
}

it("should fail fast when predicate matches a specific throwable") {
implicit val success = Success.always
val innerPolicy = Directly.forever
val counter = new AtomicInteger(0)
val future = FailFast(innerPolicy) {
case e => e.getMessage == "2"
} {
val counterValue = counter.getAndIncrement()
Future.failed(new RuntimeException(counterValue.toString))
}
future.failed.map { t =>
assert(counter.get() === 3 && t.getMessage === "2")
}
}

it("should repeat on failure until success") {
implicit val success = Success[Boolean](identity)
val retried = new AtomicInteger()
val retriedUntilSuccess = 10000
def run() =
if (retried.get() < retriedUntilSuccess) {
retried.incrementAndGet()
Future.failed(new RuntimeException)
} else {
Future(true)
}
val innerPolicy = Directly.forever
val policy = FailFast(innerPolicy) {
case _ => false
}
policy(run()).map { result =>
assert(result === true)
assert(retried.get() == 10000)
}
}

it("should repeat on failure with pause until success") {
implicit val success = Success[Boolean](identity)
val retried = new AtomicInteger()
val retriedUntilSuccess = 1000
def run() =
if (retried.get() < retriedUntilSuccess) {
retried.incrementAndGet()
Future.failed(new RuntimeException)
} else {
Future(true)
}
val innerPolicy = Pause.forever(1.millis)
val policy = FailFast(innerPolicy) {
case _ => false
}
policy(run()).map { result =>
assert(result === true)
assert(retried.get() == 1000)
}
}

it("should repeat on failure with backoff until success") {
implicit val success = Success[Boolean](identity)
val retried = new AtomicInteger()
val retriedUntilSuccess = 5
def run() =
if (retried.get() < retriedUntilSuccess) {
retried.incrementAndGet()
Future.failed(new RuntimeException)
} else {
Future(true)
}
val innerPolicy = Backoff.forever(1.millis)
val policy = FailFast(innerPolicy) {
case _ => false
}
policy(run()).map { result =>
assert(result === true)
assert(retried.get() == 5)
}
}

it("should repeat on failure with jitter backoff until success") {
implicit val success = Success[Boolean](identity)
val retried = new AtomicInteger()
val retriedUntilSuccess = 10
def run() =
if (retried.get() < retriedUntilSuccess) {
retried.incrementAndGet()
Future.failed(new RuntimeException)
} else {
Future(true)
}
val innerPolicy = JitterBackoff.forever(1.millis)
val policy = FailFast(innerPolicy) {
case _ => false
}
policy(run()).map { result =>
assert(result === true)
assert(retried.get() == 10)
}
}

it("should repeat on failure with when condition until success") {
implicit val success = Success[Boolean](identity)
class MyException extends RuntimeException
val retried = new AtomicInteger()
val retriedUntilSuccess = 10000
def run() =
if (retried.get() < retriedUntilSuccess) {
retried.incrementAndGet()
Future.failed(new MyException)
} else {
Future(true)
}
val innerPolicy = When {
case _: MyException => Directly.forever
}
val policy = FailFast(innerPolicy) {
case _ => false
}
policy(run()).map { result =>
assert(result === true)
assert(retried.get() == 10000)
}
}

it("should take precedence over when condition if it also matches fail fast condition") {
implicit val success = Success[Boolean](identity)
class MyException extends RuntimeException("my exception")
val retried = new AtomicInteger()
def run() = {
retried.incrementAndGet()
Future.failed(new MyException)
}
val innerPolicy = When {
case _: MyException => Directly.forever
}
val policy = FailFast(innerPolicy) {
case _ => true
}
policy(run()).failed.map { t =>
assert(t.getMessage === "my exception")
assert(retried.get() == 1)
}
}
}
}