diff --git a/README.md b/README.md index 5eaea46..ddb80aa 100644 --- a/README.md +++ b/README.md @@ -136,6 +136,27 @@ policy(execptionalAttempt) Note, The domain of the PartialFunction passed to When may cover both the exception thrown _or_ the successful result of the future. +#### FailFast + +`retry.FailFast` allows you to wrap any of the above policies and define which failures should immediately stop the retries. + +The difference between `retry.FailFast` and `retry.When` with a partial function for `Throwable`s is that `retry.When` +passes the execution to another policy after the first retry, whereas `retry.FailFast` uses the inner policy logic +for each retry. For instance, it allows using a policy that retries forever together with a fail fast logic +on some irrecoverable exceptions. + +```scala +val innerPolicy = retry.Backoff.forever +val policy = retry.FailFast(innerPolicy) { + case e: FooException => true + case e: RuntimeException => isFatal(e.getCause) +} + +policy(issueRequest) +``` + +When the provided partial function is not defined at a particular `Throwable`, the retry logic is defined by the wrapped policy. + #### Suggested library usage Since all retry modules now produce a generic interface, a `retry.Policy`, if you wish to write clients of services you may wish to make define diff --git a/retry/src/main/scala/Policy.scala b/retry/src/main/scala/Policy.scala index 1da5edf..5925e22 100644 --- a/retry/src/main/scala/Policy.scala +++ b/retry/src/main/scala/Policy.scala @@ -4,6 +4,7 @@ import odelay.{Delay, Timer} import scala.concurrent.duration.{Duration, FiniteDuration} import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try //import scala.language.implicitConversions import scala.util.control.NonFatal @@ -236,6 +237,43 @@ object When { } } +/** A retry policy that wraps another policy and defines which failures immediately + * stop the retries. + * + * {{{ + * val innerPolicy = retry.Backoff.forever + * val policy = retry.FailFast(innerPolicy) { + * case e: FooException => true + * case e: RuntimeException => isFatal(e.getCause) + * } + * val future = policy(issueRequest) + * }}} + * + * When the provided partial function is not defined at a particular throwable, + * the retry logic is defined by the wrapped policy. + */ +object FailFast { + def apply(policy: Policy)(failFastOn: PartialFunction[Throwable, Boolean]): Policy = + new Policy { + def apply[T](promise: PromiseWrapper[T])( + implicit success: Success[T], + executor: ExecutionContext): Future[T] = { + implicit val successWithFailFast = Success[Try[T]] { + case scala.util.Success(res) => success.predicate(res) + case scala.util.Failure(_) => true + } + policy.apply { + promise() + .map(scala.util.Success(_)) + .recover { + case e: Throwable if failFastOn.lift(e).contains(true) => + scala.util.Failure(e) + } + }.map(_.get) + } + } +} + /** Retry policy that incorporates a count */ trait CountingPolicy extends Policy { protected def countdown[T](max: Int, diff --git a/retry/src/test/scala/PolicySpec.scala b/retry/src/test/scala/PolicySpec.scala index db93ebe..4496df5 100644 --- a/retry/src/test/scala/PolicySpec.scala +++ b/retry/src/test/scala/PolicySpec.scala @@ -418,4 +418,203 @@ abstract class PolicySpec extends AsyncFunSpec with BeforeAndAfterAll { } } } + + describe("retry.FailFast") { + it("should not retry on success") { + implicit val success = Success.always + val innerPolicy = Directly() + val counter = new AtomicInteger(0) + val future = FailFast(innerPolicy) { + case _ => false + } { + counter.incrementAndGet() + Future.successful("yay!") + } + future.map(result => assert(counter.get() === 1 && result === "yay!")) + } + + it("should retry number of times specified in the inner policy") { + implicit val success = Success[Int](_ == 3) + val tries = forwardCountingFutureStream().iterator + val innerPolicy = Directly(3) + val future = FailFast(innerPolicy) { + case _ => false + } (tries.next) + future.map(result => assert(success.predicate(result) === true)) + } + + it("should fail when inner policy retries are exceeded") { + implicit val success = Success.always + val innerPolicy = Directly(3) + val counter = new AtomicInteger(0) + val future = FailFast(innerPolicy) { + case _ => false + } { + counter.incrementAndGet() + Future.failed(new RuntimeException("always failing")) + } + // expect failure after 1+3 tries + future.failed.map { t => + assert(counter.get() === 4 && t.getMessage === "always failing") + } + } + + it("should fail fast when predicate matches every throwable") { + implicit val success = Success.always + val innerPolicy = Directly.forever + val counter = new AtomicInteger(0) + val future = FailFast(innerPolicy) { + case _ => true + } { + counter.incrementAndGet() + Future.failed(new RuntimeException("always failing")) + } + future.failed.map { t => + assert(counter.get() === 1 && t.getMessage === "always failing") + } + } + + it("should fail fast when predicate matches a specific throwable") { + implicit val success = Success.always + val innerPolicy = Directly.forever + val counter = new AtomicInteger(0) + val future = FailFast(innerPolicy) { + case e => e.getMessage == "2" + } { + val counterValue = counter.getAndIncrement() + Future.failed(new RuntimeException(counterValue.toString)) + } + future.failed.map { t => + assert(counter.get() === 3 && t.getMessage === "2") + } + } + + it("should repeat on failure until success") { + implicit val success = Success[Boolean](identity) + val retried = new AtomicInteger() + val retriedUntilSuccess = 10000 + def run() = + if (retried.get() < retriedUntilSuccess) { + retried.incrementAndGet() + Future.failed(new RuntimeException) + } else { + Future(true) + } + val innerPolicy = Directly.forever + val policy = FailFast(innerPolicy) { + case _ => false + } + policy(run()).map { result => + assert(result === true) + assert(retried.get() == 10000) + } + } + + it("should repeat on failure with pause until success") { + implicit val success = Success[Boolean](identity) + val retried = new AtomicInteger() + val retriedUntilSuccess = 1000 + def run() = + if (retried.get() < retriedUntilSuccess) { + retried.incrementAndGet() + Future.failed(new RuntimeException) + } else { + Future(true) + } + val innerPolicy = Pause.forever(1.millis) + val policy = FailFast(innerPolicy) { + case _ => false + } + policy(run()).map { result => + assert(result === true) + assert(retried.get() == 1000) + } + } + + it("should repeat on failure with backoff until success") { + implicit val success = Success[Boolean](identity) + val retried = new AtomicInteger() + val retriedUntilSuccess = 5 + def run() = + if (retried.get() < retriedUntilSuccess) { + retried.incrementAndGet() + Future.failed(new RuntimeException) + } else { + Future(true) + } + val innerPolicy = Backoff.forever(1.millis) + val policy = FailFast(innerPolicy) { + case _ => false + } + policy(run()).map { result => + assert(result === true) + assert(retried.get() == 5) + } + } + + it("should repeat on failure with jitter backoff until success") { + implicit val success = Success[Boolean](identity) + val retried = new AtomicInteger() + val retriedUntilSuccess = 10 + def run() = + if (retried.get() < retriedUntilSuccess) { + retried.incrementAndGet() + Future.failed(new RuntimeException) + } else { + Future(true) + } + val innerPolicy = JitterBackoff.forever(1.millis) + val policy = FailFast(innerPolicy) { + case _ => false + } + policy(run()).map { result => + assert(result === true) + assert(retried.get() == 10) + } + } + + it("should repeat on failure with when condition until success") { + implicit val success = Success[Boolean](identity) + class MyException extends RuntimeException + val retried = new AtomicInteger() + val retriedUntilSuccess = 10000 + def run() = + if (retried.get() < retriedUntilSuccess) { + retried.incrementAndGet() + Future.failed(new MyException) + } else { + Future(true) + } + val innerPolicy = When { + case _: MyException => Directly.forever + } + val policy = FailFast(innerPolicy) { + case _ => false + } + policy(run()).map { result => + assert(result === true) + assert(retried.get() == 10000) + } + } + + it("should take precedence over when condition if it also matches fail fast condition") { + implicit val success = Success[Boolean](identity) + class MyException extends RuntimeException("my exception") + val retried = new AtomicInteger() + def run() = { + retried.incrementAndGet() + Future.failed(new MyException) + } + val innerPolicy = When { + case _: MyException => Directly.forever + } + val policy = FailFast(innerPolicy) { + case _ => true + } + policy(run()).failed.map { t => + assert(t.getMessage === "my exception") + assert(retried.get() == 1) + } + } + } }