diff --git a/src/main/scala/com/hootsuite/circuitbreaker/CircuitBreaker.scala b/src/main/scala/com/hootsuite/circuitbreaker/CircuitBreaker.scala index 3f13957..2a52b3a 100644 --- a/src/main/scala/com/hootsuite/circuitbreaker/CircuitBreaker.scala +++ b/src/main/scala/com/hootsuite/circuitbreaker/CircuitBreaker.scala @@ -14,18 +14,22 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} * A configurable circuit breaker. For the time being, the only state change detection strategy implemented is based on * the number of consecutive failures. * - * @param name the name of the circuit breaker - * @param failLimit maximum number of consecutive failures before the circuit breaker is tripped (opened) - * @param retryDelay duration until an open/broken circuit breaker lets a call through to verify whether or not it should be reset - * @param isResultFailure partial function to allow users to determine return cases which should be considered as failures + * @param name the name of the circuit breaker + * @param failLimit maximum number of consecutive failures before the circuit breaker is tripped (opened) + * @param retryDelay duration until an open/broken circuit breaker lets a call through to verify whether or not it should be reset + * @param isExponentialBackoff indicating the retry delayed should be increased exponential on consecutive failures + * @param exponentialRetryCap limits the number of times the retryDelay will be increased exponentially, ignored if not exponential backoff + * @param isResultFailure partial function to allow users to determine return cases which should be considered as failures * @param isExceptionNotFailure partial function to allow users to determine exceptions which should not be considered failures - * @param stateChangeListeners listeners that will be notified when the circuit breaker changes state (open <--> closed) - * @param invocationListeners listeners that will be notified whenever the circuit breaker handles a method/function call + * @param stateChangeListeners listeners that will be notified when the circuit breaker changes state (open <--> closed) + * @param invocationListeners listeners that will be notified whenever the circuit breaker handles a method/function call */ class CircuitBreaker private[circuitbreaker] ( val name: String, val failLimit: Int, val retryDelay: FiniteDuration, + val isExponentialBackoff: Boolean = false, + val exponentialRetryCap: Option[Int], val isResultFailure: PartialFunction[Any, Boolean] = { case _ => false }, val isExceptionNotFailure: PartialFunction[Throwable, Boolean] = { case _ => false }, val stateChangeListeners: List[CircuitBreakerStateChangeListener] = List(), @@ -38,6 +42,8 @@ class CircuitBreaker private[circuitbreaker] ( builder.name, builder.failLimit, builder.retryDelay, + builder.isExponentialBackoff, + builder.exponentialRetryCap, builder.isResultFailure, builder.isExceptionNotFailure, builder.stateChangeListeners, @@ -180,9 +186,56 @@ class CircuitBreaker private[circuitbreaker] ( * @param currentState the expected current state * @return true when the state was changed, false when the given state was not the current state */ - def attemptResetBrokenState(currentState: BrokenState): Boolean = { + def attemptResetBrokenState(currentState: State, retryCount: Int): Boolean = { logger.debug(s"Circuit breaker \'$name\', attempting to reset open/broken state") - state.compareAndSet(currentState, new BrokenState(this)) + val result = + state.compareAndSet(currentState, new AttemptResetState(this, retryCount)) + + if (result) { + stateChangeListeners.foreach { listener => + safely(notificationsExecutionContext)( + listener.onAttemptReset, + name, + "notify listener of reset attempt" + ) + } + } + result + } + + /** + * calculate the new retry + * + * @param retryCount what retry is it + * @return the number of ms to wait before retrying + */ + def calcRetryDelay(retryCount: Int): Long = { + + //calc jitter up to 1/10 the current retryDelay + //for exponential backoff + val jitter: Long = if (this.isExponentialBackoff) { + (scala.util.Random.nextFloat() * this.retryDelay.toMillis / 10).toLong + } else { + 0 + } + + val retryCap = this.exponentialRetryCap match { + case Some(x) => x + case None => Int.MaxValue + } + + val exponent: Int = if (this.isExponentialBackoff) { + Math.min(retryCount, retryCap) + } else 0 + + val result = (this.retryDelay.toMillis + jitter) * Math.pow(2, exponent).toLong + + logger.debug( + s"CB retry delay details: jitter $jitter, " + + s"retryCap $retryCap, exponent $exponent delay $result" + ) + + result } /** @@ -196,16 +249,18 @@ class CircuitBreaker private[circuitbreaker] ( /** * @inheritdoc */ - override def isBroken: Boolean = state.get() match { - case _: BrokenState => true - case _ => false - } + override def isBroken: Boolean = + state.get() match { + case _: BrokenState => true + case _: AttemptResetState => true + case _ => false + } /** * @inheritdoc */ override def isWaiting: Boolean = state.get() match { - case s: BrokenState if System.currentTimeMillis() > s.retryAt => true + case _: AttemptResetState => true case _ => false } } @@ -236,7 +291,7 @@ private object CircuitBreaker { private def safely( ec: ExecutionContext )(op: => (String) => Any, str: String, opName: String = ""): Unit = { - implicit val implicitEc = ec + implicit val implicitEc: ExecutionContext = ec Future(op(str)).recover { case NonFatal(e) => logger.warn( @@ -286,7 +341,7 @@ private object CircuitBreaker { override def onFailure(): Unit = incrementFailure() - private[this] def incrementFailure() = { + private[this] def incrementFailure(): Unit = { val currentCount = failureCount.incrementAndGet logger.debug( s"Circuit breaker ${cb.name} increment failure count to $currentCount; fail limit is ${cb.failLimit}" @@ -299,7 +354,15 @@ private object CircuitBreaker { * CircuitBreaker is opened/broken. Invocations fail immediately. */ class BrokenState(cb: CircuitBreaker) extends State { - val retryAt: Long = System.currentTimeMillis() + cb.retryDelay.toMillis + val retryDelay: Long = cb.calcRetryDelay(0) + + //Automatically transition this state at the retry time + implicit val ec: ExecutionContext = ExecutionContext.Implicits.global + Future { + Thread.sleep(retryDelay) + }.onComplete(_ => { + cb.attemptResetBrokenState(this, 0) + }) override def preInvoke(): Unit = { cb.invocationListeners.foreach { listener => @@ -310,8 +373,40 @@ private object CircuitBreaker { ) } + //immediately fail + throw new CircuitBreakerBrokenException( + cb.name, + s"Making ${cb.name} unavailable after ${cb.failLimit} errors" + ) + } + + override def postInvoke(): Unit = { /* do nothing */ } + + override def onThrowable(e: Throwable): Unit = { /* do nothing */ } + + override def onFailure(): Unit = { /* do nothing */ } + } + + /** + * CircuitBreaker is opened/waiting. Invocations are attempted + */ + class AttemptResetState(cb: CircuitBreaker, retryCount: Int = 0) extends State { + val retryAt: Long = retryCount match { + case 0 => System.currentTimeMillis() + case _ => System.currentTimeMillis() + cb.calcRetryDelay(retryCount) + } + + override def preInvoke(): Unit = { + cb.invocationListeners.foreach { listener => + safely(cb.notificationsExecutionContext)( + listener.onInvocationInAttemptResetState, + cb.name, + "notify listener of invocation in attempt reset state" + ) + } + val retry = System.currentTimeMillis > retryAt - if (!(retry && cb.attemptResetBrokenState(this))) { + if (!(retry && cb.attemptResetBrokenState(this, this.retryCount + 1))) { throw new CircuitBreakerBrokenException( cb.name, s"Making ${cb.name} unavailable after ${cb.failLimit} errors" @@ -328,23 +423,28 @@ private object CircuitBreaker { override def onFailure(): Unit = { /* do nothing */ } } + } /** * Builder for [[CircuitBreaker]] * - * @param name the name of the circuit breaker - * @param failLimit maximum number of consecutive failures before the circuit breaker is tripped (opened) - * @param retryDelay duration until an open/broken circuit breaker lets a call through to verify whether or not it should be reset - * @param isResultFailure partial function to allow users to determine return cases which should be considered as failures + * @param name the name of the circuit breaker + * @param failLimit maximum number of consecutive failures before the circuit breaker is tripped (opened) + * @param retryDelay duration until an open/broken circuit breaker lets a call through to verify whether or not it should be reset + * @param isExponentialBackoff indicating the retry delayed should be increased exponential on consecutive failures + * @param exponentialRetryCap limits the number of times the retryDelay will be increased exponentially, ignored if not exponential backoff + * @param isResultFailure partial function to allow users to determine return cases which should be considered as failures * @param isExceptionNotFailure partial function to allow users to determine exceptions which should not be considered failures - * @param stateChangeListeners listeners that will be notified when the circuit breaker changes state (open <--> closed) - * @param invocationListeners listeners that will be notified whenever the circuit breaker handles a method/function call + * @param stateChangeListeners listeners that will be notified when the circuit breaker changes state (open <--> closed) + * @param invocationListeners listeners that will be notified whenever the circuit breaker handles a method/function call */ case class CircuitBreakerBuilder( name: String, failLimit: Int, retryDelay: FiniteDuration, + isExponentialBackoff: Boolean = false, + exponentialRetryCap: Option[Int] = Some(10), isResultFailure: PartialFunction[Any, Boolean] = { case _ => false }, isExceptionNotFailure: PartialFunction[Throwable, Boolean] = { case _ => false }, stateChangeListeners: List[CircuitBreakerStateChangeListener] = List(), diff --git a/src/main/scala/com/hootsuite/circuitbreaker/CircuitBreakerRegistry.scala b/src/main/scala/com/hootsuite/circuitbreaker/CircuitBreakerRegistry.scala index 10aac31..96de1e9 100644 --- a/src/main/scala/com/hootsuite/circuitbreaker/CircuitBreakerRegistry.scala +++ b/src/main/scala/com/hootsuite/circuitbreaker/CircuitBreakerRegistry.scala @@ -12,7 +12,8 @@ object CircuitBreakerRegistry { private val logger = LoggerFactory.getLogger(getClass) - private val circuitBreakerStore = emptyCMap[String, ReadOnlyCircuitBreakerSnapshot] + private val circuitBreakerStore = + emptyCMap[String, ReadOnlyCircuitBreakerSnapshot] /** * Registers a circuit breaker @@ -43,13 +44,15 @@ object CircuitBreakerRegistry { * @param name name of the circuit breaker to remove * @return an option value containing the CircuitBreaker, or None if CircuitBreaker was not present in the map before */ - def remove(name: String): Option[ReadOnlyCircuitBreakerSnapshot] = circuitBreakerStore.remove(name) + def remove(name: String): Option[ReadOnlyCircuitBreakerSnapshot] = + circuitBreakerStore.remove(name) /** * Gets a read-only snapshot of all CircuitBreakers stored in this registry * @return a Map containing a read-only snapshot of all CircuitBreakers stored in this registry */ - def getAll: collection.Map[String, ReadOnlyCircuitBreakerSnapshot] = circuitBreakerStore.readOnlySnapshot() + def getAll: collection.Map[String, ReadOnlyCircuitBreakerSnapshot] = + circuitBreakerStore.readOnlySnapshot() /** * Gets a read-only snapshot of a CircuitBreaker stored in this registry, by its name diff --git a/src/main/scala/com/hootsuite/circuitbreaker/listeners/CircuitBreakerInvocationListener.scala b/src/main/scala/com/hootsuite/circuitbreaker/listeners/CircuitBreakerInvocationListener.scala index 900bae3..d5fd8d4 100644 --- a/src/main/scala/com/hootsuite/circuitbreaker/listeners/CircuitBreakerInvocationListener.scala +++ b/src/main/scala/com/hootsuite/circuitbreaker/listeners/CircuitBreakerInvocationListener.scala @@ -18,4 +18,11 @@ trait CircuitBreakerInvocationListener { * @param name - name of the circuit breaker */ def onInvocationInBrokenState(name: String): Unit = { /* empty */ } + + /** + * Called when a wrapped function/method is called while the circuit breaker is opened (attempt reset) + * + * @param name - name of the circuit breaker + */ + def onInvocationInAttemptResetState(name: String): Unit = { /* empty */ } } diff --git a/src/main/scala/com/hootsuite/circuitbreaker/listeners/CircuitBreakerStateChangeListener.scala b/src/main/scala/com/hootsuite/circuitbreaker/listeners/CircuitBreakerStateChangeListener.scala index e7bf683..ca9d783 100644 --- a/src/main/scala/com/hootsuite/circuitbreaker/listeners/CircuitBreakerStateChangeListener.scala +++ b/src/main/scala/com/hootsuite/circuitbreaker/listeners/CircuitBreakerStateChangeListener.scala @@ -19,6 +19,16 @@ trait CircuitBreakerStateChangeListener { */ def onTrip(name: String): Unit = { /* blank default implementation */ } + /** + * Called when a the circuit breaker is attempting to reset (i.e. open -> waiting) + * + * @param name - name of the circuit breaker + */ + def onAttemptReset(name: String): Unit = { + + /* blank default implementation */ + } + /** * Called when a the circuit breaker is closed (i.e. open -> closed) * diff --git a/src/test/scala/com/hootsuite/circuitbreaker/CircuitBreakerRegistryTest.scala b/src/test/scala/com/hootsuite/circuitbreaker/CircuitBreakerRegistryTest.scala index ecd5a11..49ae82d 100644 --- a/src/test/scala/com/hootsuite/circuitbreaker/CircuitBreakerRegistryTest.scala +++ b/src/test/scala/com/hootsuite/circuitbreaker/CircuitBreakerRegistryTest.scala @@ -14,7 +14,8 @@ class CircuitBreakerRegistryTest extends FlatSpec with Matchers with BeforeAndAf val retryDelay = Duration(100, TimeUnit.MILLISECONDS) - private def waitUntilRetryDelayHasExpired() = Thread.sleep(2 * retryDelay.toMillis) + private def waitUntilRetryDelayHasExpired() = + Thread.sleep(2 * retryDelay.toMillis) "registry" should "be empty on startup" in { CircuitBreakerRegistry.getAll.isEmpty shouldEqual true @@ -69,7 +70,8 @@ class CircuitBreakerRegistryTest extends FlatSpec with Matchers with BeforeAndAf it should "return a read-once version of the underlying circuit breaker" in { val name = "trip fast" - val actualCircuitBreaker = CircuitBreakerBuilder(name, 1, retryDelay).build() + val actualCircuitBreaker = + CircuitBreakerBuilder(name, 1, retryDelay).build() val lookedUpCircuitBreaker = CircuitBreakerRegistry.get(name).getOrElse(throw new Exception("should've found this!")) diff --git a/src/test/scala/com/hootsuite/circuitbreaker/CircuitBreakerTest.scala b/src/test/scala/com/hootsuite/circuitbreaker/CircuitBreakerTest.scala index f5cba6a..b4c8a01 100644 --- a/src/test/scala/com/hootsuite/circuitbreaker/CircuitBreakerTest.scala +++ b/src/test/scala/com/hootsuite/circuitbreaker/CircuitBreakerTest.scala @@ -3,7 +3,7 @@ package com.hootsuite.circuitbreaker import com.hootsuite.circuitbreaker.listeners.{CircuitBreakerInvocationListener, CircuitBreakerStateChangeListener} import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Millis, Seconds, Span} -import org.scalatest.{FlatSpec, Matchers} +import org.scalatest.{Assertion, FlatSpec, Matchers} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.{Duration, FiniteDuration} @@ -13,196 +13,357 @@ import scala.util.{Failure, Success} import java.util.concurrent.TimeUnit class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { + private val numMillisecondsForRetryDelay = 200L - object SimpleOperation { + // for whenReady calls + private implicit val defaultPatience: PatienceConfig = + PatienceConfig(timeout = Span(2, Seconds), interval = Span(10, Millis)) + private val defaultRetryDelay = + Duration(numMillisecondsForRetryDelay, TimeUnit.MILLISECONDS) - def operation(x: Int, y: Int): Int = x / y + def assertArithException(hint: String, timeoutLength: Int = 100)(implicit cb: CircuitBreaker): Assertion = + whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(timeoutLength, Millis))) { e => + withClue(s"${cb.name} : $hint") { + e shouldBe a[ArithmeticException] + } + } - def asyncOperation(x: Int, y: Int): Future[Int] = Future { x / y } + def assertArithExceptionAsync(hint: String, timeoutLength: Int = 100)( + implicit cb: CircuitBreaker + ): Assertion = + whenReady(protectedAsyncOperation(1, 0).failed, timeout(Span(timeoutLength, Millis))) { e => + withClue(s"${cb.name} : $hint") { + e shouldBe a[ArithmeticException] + } + } + + def protectedAsyncOperation(x: Int, y: Int)(implicit cb: CircuitBreaker): Future[Int] = cb.async() { + SimpleOperation.asyncOperation(x, y) } - // for whenReady calls - private implicit val defaultPatience = - PatienceConfig(timeout = Span(2, Seconds), interval = Span(10, Millis)) + def assertCircuitException(hint: String, timeoutLength: Int = 100)( + implicit cb: CircuitBreaker + ): Assertion = + whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(timeoutLength, Millis))) { e => + withClue(s"${cb.name} : $hint") { + e shouldBe a[CircuitBreakerBrokenException] + } + } - private val numMillisecondsForRetryDelay = 200L - private val defaultRetryDelay = Duration(numMillisecondsForRetryDelay, TimeUnit.MILLISECONDS) + def assertCircuitExceptionAsync(hint: String, timeoutLength: Int = 100)( + implicit cb: CircuitBreaker + ): Assertion = + whenReady(protectedAsyncOperation(1, 0).failed, timeout(Span(timeoutLength, Millis))) { e => + withClue(s"${cb.name} : $hint") { + e shouldBe a[CircuitBreakerBrokenException] + } + } - private def waitUntilRetryDelayHasExpired() = Thread.sleep(2 * numMillisecondsForRetryDelay) + def assertClosed(hint: String, timeoutLength: Int = 100)(implicit cb: CircuitBreaker): Assertion = + whenReady(Future(protectedOperation(2, 1), timeout(Span(timeoutLength, Millis)))) { + case (result, _) => + withClue(s"${cb.name} : $hint") { + result shouldEqual 2 + } + } - private def simpleBuilder(name: String, failLimit: Int, retryDelay: FiniteDuration) = - CircuitBreakerBuilder(name = name, failLimit = failLimit, retryDelay = retryDelay) + def protectedOperation(x: Int, y: Int)(implicit cb: CircuitBreaker): Int = + cb() { + SimpleOperation.operation(x, y) + } + + private def waitUntilRetryDelayHasExpired(millis: Option[Long] = None): Unit = + millis match { + case Some(x) => Thread.sleep(x) + case None => Thread.sleep(2 * numMillisecondsForRetryDelay) + } // CB builders that misbehave on (state change|invocation) listeners - either throwing or blocking // + one plain CB for baseline private def simpleBuilders( failLimit: Int = 2, retryDelay: FiniteDuration = defaultRetryDelay - ): List[CircuitBreakerBuilder] = List( - simpleBuilder("simple", failLimit, retryDelay), - simpleBuilder("invocation listeners both block", failLimit, retryDelay) - .withInvocationListeners(List(new CircuitBreakerInvocationListener { + ): List[CircuitBreakerBuilder] = { + val defaultThreadSleep = 5000 - override def onInvocationInFlowState(name: String) = Thread.sleep(5000) + List( + simpleBuilder("simple", failLimit, retryDelay), + simpleBuilder("invocation listeners both block", failLimit, retryDelay) + .withInvocationListeners(List(new CircuitBreakerInvocationListener { - override def onInvocationInBrokenState(name: String) = Thread.sleep(5000) - })), - simpleBuilder("invocation flow throws, invocation broken blocks", failLimit, retryDelay) - .withInvocationListeners(List(new CircuitBreakerInvocationListener { + override def onInvocationInFlowState(name: String): Unit = + Thread.sleep(defaultThreadSleep) - override def onInvocationInFlowState(name: String) = throw new Exception("boom") + override def onInvocationInBrokenState(name: String): Unit = + Thread.sleep(defaultThreadSleep) + })), + simpleBuilder("invocation flow throws, invocation broken blocks", failLimit, retryDelay) + .withInvocationListeners(List(new CircuitBreakerInvocationListener { - override def onInvocationInBrokenState(name: String) = Thread.sleep(5000) - })), - simpleBuilder("invocation flow blocks, invocation broken blocks", failLimit, retryDelay) - .withInvocationListeners(List(new CircuitBreakerInvocationListener { + override def onInvocationInFlowState(name: String): Unit = + throw new Exception("boom") - override def onInvocationInFlowState(name: String) = Thread.sleep(5000) + override def onInvocationInBrokenState(name: String): Unit = + Thread.sleep(defaultThreadSleep) + })), + simpleBuilder("invocation flow blocks, invocation broken blocks", failLimit, retryDelay) + .withInvocationListeners(List(new CircuitBreakerInvocationListener { - override def onInvocationInBrokenState(name: String) = throw new Exception("boom") - })), - simpleBuilder("invocation listeners both throw", failLimit, retryDelay) - .withInvocationListeners(List(new CircuitBreakerInvocationListener { + override def onInvocationInFlowState(name: String): Unit = + Thread.sleep(defaultThreadSleep) - override def onInvocationInFlowState(name: String) = throw new Exception("boom") + override def onInvocationInBrokenState(name: String): Unit = + throw new Exception("boom") + })), + simpleBuilder("invocation listeners both throw", failLimit, retryDelay) + .withInvocationListeners(List(new CircuitBreakerInvocationListener { - override def onInvocationInBrokenState(name: String) = throw new Exception("boom") - })), - simpleBuilder("state change listeners all throw", failLimit, retryDelay) - .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { + override def onInvocationInFlowState(name: String): Unit = + throw new Exception("boom") - override def onInit(name: String) = throw new Exception("boom") + override def onInvocationInBrokenState(name: String): Unit = + throw new Exception("boom") + })), + simpleBuilder("state change listeners all throw", failLimit, retryDelay) + .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { - override def onTrip(name: String) = throw new Exception("boom") + override def onInit(name: String): Unit = throw new Exception("boom") - override def onReset(name: String) = throw new Exception("boom") - })), - simpleBuilder("state change listeners all block", failLimit, retryDelay) - .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { + override def onTrip(name: String): Unit = throw new Exception("boom") - override def onInit(name: String) = Thread.sleep(5000) + override def onReset(name: String): Unit = throw new Exception("boom") + })), + simpleBuilder("state change listeners all block", failLimit, retryDelay) + .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { - override def onTrip(name: String) = Thread.sleep(5000) + override def onInit(name: String): Unit = + Thread.sleep(defaultThreadSleep) - override def onReset(name: String) = Thread.sleep(5000) - })), - simpleBuilder("state change onInit throws, onTrip, onReset block", failLimit, retryDelay) - .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { + override def onTrip(name: String): Unit = + Thread.sleep(defaultThreadSleep) - override def onInit(name: String) = throw new Exception("boom") + override def onReset(name: String): Unit = + Thread.sleep(defaultThreadSleep) + })), + simpleBuilder("state change onInit throws, onTrip, onReset block", failLimit, retryDelay) + .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { - override def onTrip(name: String) = Thread.sleep(5000) + override def onInit(name: String): Unit = throw new Exception("boom") - override def onReset(name: String) = Thread.sleep(5000) - })), - simpleBuilder("state change onTrip throws, onInit, onReset block", failLimit, retryDelay) - .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { + override def onTrip(name: String): Unit = + Thread.sleep(defaultThreadSleep) - override def onInit(name: String) = Thread.sleep(5000) + override def onReset(name: String): Unit = + Thread.sleep(defaultThreadSleep) + })), + simpleBuilder("state change onTrip throws, onInit, onReset block", failLimit, retryDelay) + .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { - override def onTrip(name: String) = throw new Exception("boom") + override def onInit(name: String): Unit = + Thread.sleep(defaultThreadSleep) - override def onReset(name: String) = Thread.sleep(5000) - })), - simpleBuilder("state change onReset throws, onInit, onReset block", failLimit, retryDelay) - .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { + override def onTrip(name: String): Unit = throw new Exception("boom") - override def onInit(name: String) = Thread.sleep(5000) + override def onReset(name: String): Unit = + Thread.sleep(defaultThreadSleep) + })), + simpleBuilder("state change onReset throws, onInit, onReset block", failLimit, retryDelay) + .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { - override def onTrip(name: String) = Thread.sleep(5000) + override def onInit(name: String): Unit = + Thread.sleep(defaultThreadSleep) - override def onReset(name: String) = throw new Exception("boom") - })) - ) + override def onTrip(name: String): Unit = + Thread.sleep(defaultThreadSleep) - "simple circuit breaker" should "record failures, trip, then reset after delay time has elapsed" in { + override def onReset(name: String): Unit = throw new Exception("boom") + })) + ) + } - simpleBuilders().map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int) = cb() { - SimpleOperation.operation(x, y) - } + private def simpleBuilder(name: String, failLimit: Int, retryDelay: FiniteDuration) = + CircuitBreakerBuilder(name = name, failLimit = failLimit, retryDelay = retryDelay) + + private def exponentialBuilders( + failLimit: Int = 2, + retryDelay: FiniteDuration = defaultRetryDelay, + retryCap: Int = 10 + ): List[CircuitBreakerBuilder] = + List(exponentialBuilder("simple", failLimit, retryDelay, retryCap)) + + private def exponentialBuilder( + name: String, + failLimit: Int, + retryDelay: FiniteDuration, + exponentialRetryCap: Int + ) = + CircuitBreakerBuilder( + name = name, + failLimit = failLimit, + retryDelay = retryDelay, + isExponentialBackoff = true, + exponentialRetryCap = Some(exponentialRetryCap) + ) + + object SimpleOperation { + + def operation(x: Int, y: Int): Int = x / y + def asyncOperation(x: Int, y: Int): Future[Int] = Future { + x / y + } + } + + "simple circuit breaker" should "record failures, trip, then reset after delay time has elapsed" in { + + simpleBuilders().map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x // run all protectedOperation invocations async, so we can catch & timeout rogue, blocking listeners // 100ms should be more than enough for our SimpleOperation // first failure; let it through - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : first failure") { - e shouldBe a[ArithmeticException] - } - } + assertArithException("first failure") // second failure; let it through again but this should trip the circuit breaker - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : second failure") { - e shouldBe a[ArithmeticException] - } - } + assertArithException("second failure") // now we get a circuit breaker exception because the circuit breaker is open - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : CB broken failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitException("CB broken failure") //wait a bit waitUntilRetryDelayHasExpired() //circuit should now be closed and a valid operation should just work - whenReady(Future(protectedOperation(2, 1), timeout(Span(100, Millis)))) { - case (result, timeout) => - withClue(s"${cb.name} : after retry delay") { - result shouldEqual 2 - } - } + assertClosed("after retry delay") } } - it should "remain in tripped state on repeated errors" in { - simpleBuilders(retryDelay = Duration(2, TimeUnit.SECONDS)).map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int) = cb() { - SimpleOperation.operation(x, y) + it should "wait exponentially longer before retrying if is an exponential backoff CB" in { + + val baseRetryTime = 30 + val baseWaitTime = (baseRetryTime * 1.1).toInt + exponentialBuilders(retryDelay = Duration(baseRetryTime, TimeUnit.MILLISECONDS)).map(_.build()).foreach { + x => + implicit val cb: CircuitBreaker = x + + //should fail on first exception + assertArithException("1st should be Arithmetic") + + //should fail on second exception because the circuit has not been tripped + assertArithException("2nd should be Arithmetic") + + //next attempt should be circuit because we have reached the fail limit of 2 + assertCircuitException("3rd should be circuit") + + //wait the normal amount of time + waitUntilRetryDelayHasExpired(Some(baseWaitTime)) + + //should fail on first exception + assertArithException("4th should be Arithmetic") + + //next attempt should be circuit we are in a broken state, not a flow state + assertCircuitException("5th should be circuit") + + //wait a bit (this wont be enough, need to wait twice as long + waitUntilRetryDelayHasExpired(Some(baseWaitTime)) + + //next attempt should fail, we didn't wait long enough + assertCircuitException("6th should be circuit") + + //wait a lot longer + waitUntilRetryDelayHasExpired(Some(baseWaitTime * 2)) + + //circuit should now be closed and a valid operation should just work + assertClosed("after exponential retry delay") + } + } + + it should "wait exponentially longer before retrying if is an exponential backoff CB but stop " + + "increasing at limit" in { + + val baseRetryTime = 100 + val baseWaitTime = baseRetryTime * 1.1 + val retryCap = 2 + exponentialBuilders(retryDelay = Duration(baseRetryTime, TimeUnit.MILLISECONDS), retryCap = retryCap) + .map(_.build()) + .foreach { x => + implicit val cb: CircuitBreaker = x + + //should fail on first exception + assertArithException("1st should be Arithmetic") + + //should fail on second exception because the circuit has not been tripped + assertArithException("2nd should be Arithmetic") + + //next attempt should be circuit because we have reached the fail limit of 2 + assertCircuitException("3rd should be circuit") + + //wait the normal amount of time + waitUntilRetryDelayHasExpired(Some(baseRetryTime)) + + //should fail on first exception + assertArithException("4th should be Arithmetic") + + //next attempt should be circuit we are in a broken state, not a flow state + assertCircuitException("5th should be circuit") + + //wait a bit (this wont be enough, need to wait 2^1x as long + waitUntilRetryDelayHasExpired(Some((baseWaitTime * 2).toInt)) + + //should fail on first exception + assertArithException("6th should be Arithmetic") + + //next attempt should be circuit we are in a broken state, not a flow state + assertCircuitException("7th should be circuit") + + //wait a bit longer this time 2^2x + waitUntilRetryDelayHasExpired(Some((baseWaitTime * 4).toInt)) + + //should fail on first exception + assertArithException("8th should be Arithmetic") + + //next attempt should be circuit we are in a broken state, not a flow state + assertCircuitException("9th should be circuit") + + //wait a bit longer this time 2^2x + waitUntilRetryDelayHasExpired(Some((baseWaitTime * 4).toInt)) + + //if the cap was not used, we we have to wait 2^3x as long, so only waiting + //2^2x should be long enough + + //should fail on first exception + assertArithException("8th should be Arithmetic") + } + } + + it should "remain in tripped state on repeated errors" in { + simpleBuilders(retryDelay = Duration(2, TimeUnit.SECONDS)).map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x (1 to 2).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } + assertArithException(s"failure #$i") } // the CB is now tripped // repeated calls will just fail immediately, regardless of call (1 to 100).foreach { i => - // this call is legal but CB is in tripped state - whenReady(Future(protectedOperation(2, 1)).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : attempt #$i while CB tripped") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitException(s"attempt #$i while CB tripped") } } } it should "be 'waiting' if it is in a tripped state, but the reset time delay has been reached" in { - simpleBuilders().map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int) = cb() { - SimpleOperation.operation(x, y) - } + simpleBuilders().map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x cb.isFlowing shouldBe true cb.isBroken shouldBe false cb.isWaiting shouldBe false (1 to 2).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } + assertArithException(s"failure #$i") } cb.isFlowing shouldBe false @@ -219,86 +380,52 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } it should "let call through to underlying function when reset time delay reached, even with error" in { - simpleBuilders().map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int) = cb() { - SimpleOperation.operation(x, y) - } + simpleBuilders().map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x (1 to 2).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } + assertArithException(s"failure #$i") } // now we get a circuit breaker exception because the circuit breaker is open - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : CB broken failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitException("CB broken failure") //wait until after the reset delay time has elapsed waitUntilRetryDelayHasExpired() // CB should let this call go through to test whether or not we need reset - will fail - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : test reset attempt after retry delay") { - e shouldBe a[ArithmeticException] - } - } + assertArithException("test reset attempt after retry delay") // back to tripped state but with retry delay now reset, fail immediately with CB exception - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : CB remain broken after failed reset attempt") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitException("CB remain broken after failed reset attempt", 10) } } "simple async circuit breaker" should "record failures, trip, then reset after delay time has elapsed" in { - simpleBuilders().map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int) = cb.async() { - SimpleOperation.asyncOperation(x, y) - } + simpleBuilders().map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x (1 to 2).foreach { i => - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } + assertArithExceptionAsync(s"failure #$i") } // now we get a circuit breaker exception because the circuit breaker is open - whenReady(protectedOperation(1, 0).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : CB broken failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitExceptionAsync("CB broken failure", 10) //wait a bit waitUntilRetryDelayHasExpired() //circuit should now be closed and a valid operation should just work - whenReady(protectedOperation(2, 1), timeout(Span(100, Millis))) { result => - withClue(s"${cb.name} : after retry delay") { - result shouldEqual 2 - } - } + assertClosed("after retry delay") } } it should "remain in tripped state on repeated errors" in { - simpleBuilders(retryDelay = Duration(2, TimeUnit.SECONDS)).map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int) = cb.async() { - SimpleOperation.asyncOperation(x, y) - } + simpleBuilders(retryDelay = Duration(2, TimeUnit.SECONDS)).map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x - def syncProtectedOperation(x: Int, y: Int): Int = - Await.result(protectedOperation(x, y), Duration(1, TimeUnit.SECONDS)) + def syncProtectedOperation(x: Int, y: Int)(implicit cb: CircuitBreaker): Int = + Await.result(protectedAsyncOperation(x, y), Duration(1, TimeUnit.SECONDS)) (1 to 2).foreach { i => whenReady(Future(syncProtectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => @@ -323,21 +450,15 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } it should "be 'waiting' if it is in a tripped state, but the reset time delay has been reached" in { - simpleBuilders().map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int) = cb.async() { - SimpleOperation.asyncOperation(x, y) - } + simpleBuilders().map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x cb.isFlowing shouldBe true cb.isBroken shouldBe false cb.isWaiting shouldBe false (1 to 2).foreach { i => - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } + assertArithExceptionAsync(s"failure #$i") } cb.isFlowing shouldBe false @@ -355,65 +476,46 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { it should "let call through to underlying function when reset time delay reached, even with error" in { - simpleBuilders().map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int): Future[Int] = cb.async() { - SimpleOperation.asyncOperation(x, y) - } + simpleBuilders().map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x (1 to 2).foreach { i => - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } + assertArithExceptionAsync(s"failure #$i") } // now we get a circuit breaker exception because the circuit breaker is open - whenReady(protectedOperation(1, 0).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : CB broken failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitExceptionAsync("CB Broken Failure") //wait until after the reset delay time has elapsed waitUntilRetryDelayHasExpired() // CB should let this call go through to test whether or not we need reset - will fail - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : test reset attempt after retry delay") { - e shouldBe a[ArithmeticException] - } - } + assertArithExceptionAsync("test reset attempt after retry delay") // back to tripped state but with retry delay now reset, fail immediately with CB exception - whenReady(protectedOperation(1, 0).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : CB remain broken after failed reset attempt") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitExceptionAsync("CB remain broken after failed reset attempt", 10) } } "circuit breaker with fallback value" should "return the fallback value when the circuit breaker is tripped" in { - simpleBuilders(failLimit = 5).map(_.build()).foreach { cb => + simpleBuilders(failLimit = 5).map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x + val theDefaultValue = 0 // a bit of an odd behaviour for the fallback in this case, but tests the basic idea - def protectedOperation(x: Int, y: Int) = cb(fallback = Some(Success(theDefaultValue))) { - SimpleOperation.operation(x, y) - } + def fallbackOperation(x: Int, y: Int) = + cb(fallback = Some(Success(theDefaultValue))) { + SimpleOperation.operation(x, y) + } (1 to 5).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } + assertArithException(s"failure #$i") } // return the default value here, while the circuit is tripped - whenReady(Future(protectedOperation(222, 1), timeout(Span(100, Millis)))) { - case (result, timeout) => + whenReady(Future(fallbackOperation(222, 1), timeout(Span(100, Millis)))) { + case (result, _) => withClue(s"${cb.name} : after retry delay") { result shouldEqual theDefaultValue } @@ -422,24 +524,23 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } it should "throw a custom exception when configured as such and when the circuit breaker is tripped" in { - simpleBuilders(failLimit = 5).map(_.build()).foreach { cb => + simpleBuilders(failLimit = 5).map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x + class CustomException extends Throwable val customFailure = Failure(new CustomException) // a bit of an odd behaviour for the fallback in this case, but tests the basic idea - def protectedOperation(x: Int, y: Int) = cb(fallback = Some(customFailure)) { - SimpleOperation.operation(x, y) - } + def customExceptionOperation(x: Int, y: Int) = + cb(fallback = Some(customFailure)) { + SimpleOperation.operation(x, y) + } (1 to 5).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } + assertArithException(s"failure #$i") } - whenReady(Future(protectedOperation(2, 1)).failed, timeout(Span(10, Millis))) { e => + whenReady(Future(customExceptionOperation(2, 1)).failed, timeout(Span(10, Millis))) { e => withClue(s"${cb.name} : custom fallback failure") { e shouldBe a[CustomException] } @@ -448,24 +549,23 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } "async circuit breaker with fallback value" should "return the fallback value when the circuit breaker is tripped" in { - simpleBuilders(failLimit = 5).map(_.build()).foreach { cb => + simpleBuilders(failLimit = 5).map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x + val theDefaultValue = 0 // a bit of an odd behaviour for the fallback in this case, but tests the basic idea - def protectedOperation(x: Int, y: Int) = cb.async(fallback = Some(Success(theDefaultValue))) { - SimpleOperation.asyncOperation(x, y) - } + def fallbackOperation(x: Int, y: Int) = + cb.async(fallback = Some(Success(theDefaultValue))) { + SimpleOperation.asyncOperation(x, y) + } (1 to 5).foreach { i => - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } + assertArithExceptionAsync(s"failure #$i") } // return the default value here, while the circuit is tripped - whenReady(protectedOperation(222, 1), timeout(Span(100, Millis))) { result => + whenReady(fallbackOperation(222, 1), timeout(Span(100, Millis))) { result => withClue(s"${cb.name} : after retry delay") { result shouldEqual theDefaultValue } @@ -474,24 +574,23 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } it should "throw a custom exception when configured as such and when the circuit breaker is tripped" in { - simpleBuilders(failLimit = 5).map(_.build()).foreach { cb => + simpleBuilders(failLimit = 5).map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x + class CustomException extends Throwable val customFailure = Failure(new CustomException) // a bit of an odd behaviour for the fallback in this case, but tests the basic idea - def protectedOperation(x: Int, y: Int) = cb.async(fallback = Some(customFailure)) { - SimpleOperation.asyncOperation(x, y) - } + def fallbackOperationWithCustomException(x: Int, y: Int) = + cb.async(fallback = Some(customFailure)) { + SimpleOperation.asyncOperation(x, y) + } (1 to 5).foreach { i => - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } + assertArithExceptionAsync(s"failure #$i") } - whenReady(protectedOperation(2, 1).failed, timeout(Span(10, Millis))) { e => + whenReady(fallbackOperationWithCustomException(2, 1).failed, timeout(Span(10, Millis))) { e => withClue(s"${cb.name} : custom fallback failure") { e shouldBe a[CustomException] } @@ -505,32 +604,25 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { case i: Int if i == 2 => true // whenever the returned value is 2, the circuit breaker should mark as a failure }.build()) - .foreach { cb => - def protectedOperation(x: Int, y: Int) = cb() { - SimpleOperation.operation(x, y) - } + .foreach { x => + implicit val cb: CircuitBreaker = x whenReady(Future(protectedOperation(2, 1), timeout(Span(100, Millis)))) { - case (result, timeout) => + case (result, _) => withClue(s"${cb.name} : still returns the value but records it as a failure") { result shouldEqual 2 } } whenReady(Future(protectedOperation(16, 8), timeout(Span(100, Millis)))) { - case (result, timeout) => + case (result, _) => withClue(s"${cb.name} : still returns the value but records it as a failure, trip CB") { result shouldEqual 2 } } // circuit breaker is now open - whenReady(Future(protectedOperation(3, 1)).failed, timeout(Span(10, Millis))) { e => - // the underlying function is never called, so the return value doesn't matter at this point - withClue(s"${cb.name} : custom fallback failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitException("custom fallback failure") } } @@ -539,12 +631,16 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { .map(_.withResultFailureCases { case _ => throw new Exception("a dumb thing to do") }.build()) - .foreach { cb => - def protectedOperation() = cb() { 1 } + .foreach { x => + implicit val cb: CircuitBreaker = x + + def protectedOperation() = cb() { + 1 + } // this should not blow up with an exception whenReady(Future(protectedOperation(), timeout(Span(100, Millis)))) { - case (result, timeout) => + case (result, _) => withClue(s"${cb.name} : still return result when failure function throws") { result shouldEqual 1 } @@ -558,30 +654,23 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { case i: Int if i == 2 => true // whenever the returned value is 2, the circuit breaker should mark as a failure }.build()) - .foreach { cb => - def protectedOperation(x: Int, y: Int) = cb.async() { - SimpleOperation.asyncOperation(x, y) - } + .foreach { x => + implicit val cb: CircuitBreaker = x - whenReady(protectedOperation(2, 1), timeout(Span(100, Millis))) { result => + whenReady(protectedAsyncOperation(2, 1), timeout(Span(100, Millis))) { result => withClue(s"${cb.name} : still returns the value but records it as a failure") { result shouldEqual 2 } } - whenReady(protectedOperation(16, 8), timeout(Span(100, Millis))) { result => + whenReady(protectedAsyncOperation(16, 8), timeout(Span(100, Millis))) { result => withClue(s"${cb.name} : still returns the value but records it as a failure, trip CB") { result shouldEqual 2 } } // circuit breaker is now open - whenReady(protectedOperation(3, 1).failed, timeout(Span(10, Millis))) { e => - // the underlying function is never called, so the return value doesn't matter at this point - withClue(s"${cb.name} : custom fallback failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitExceptionAsync("custom fallback failure", 10) } } @@ -591,8 +680,14 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { // the following is a terrible idea - don't do this; but if someone does, ignore and log case _ => throw new Exception("a dumb thing to do") }.build()) - .foreach { cb => - def protectedOperation(): Future[Int] = cb.async() { Future { 1 } } + .foreach { x => + implicit val cb: CircuitBreaker = x + + def protectedOperation(): Future[Int] = cb.async() { + Future { + 1 + } + } // this should not blow up with an exception whenReady(protectedOperation(), timeout(Span(100, Millis))) { result => @@ -606,20 +701,14 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { "circuit breaker configured to ignore certain exceptions" should "not be tripped when these exceptions occur" in { simpleBuilders() .map(_.withNonFailureExceptionCases { - case e: ArithmeticException => true + case _: ArithmeticException => true }.build()) - .foreach { cb => - def protectedOperation(x: Int, y: Int) = cb() { - SimpleOperation.operation(x, y) - } + .foreach { x => + implicit val cb: CircuitBreaker = x // the circuit is never tripped and just lets these exceptions through (1 to 10).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } + assertArithException(s"failure #$i") } } } @@ -627,22 +716,21 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { it should "not ignore exceptions that have not be configured to be ignored" in { simpleBuilders() .map(_.withNonFailureExceptionCases { - case e: ArithmeticException => true + case _: ArithmeticException => true }.build()) - .foreach { cb => + .foreach { x => + implicit val cb: CircuitBreaker = x + def protectedOperation(x: Int, y: Int) = cb() { val ret = SimpleOperation.operation(x, y) - if (ret == 3) throw new IllegalStateException("just here to verify this exception is not ignored") + if (ret == 3) + throw new IllegalStateException("just here to verify this exception is not ignored") ret } // the circuit is never tripped and just lets these exceptions through (1 to 10).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : (non) failure #$i") { - e shouldBe a[ArithmeticException] - } - } + assertArithException(s"(non) failure #$i") } whenReady(Future(protectedOperation(3, 1)).failed, timeout(Span(100, Millis))) { e => @@ -657,11 +745,7 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } // CB is now tripped - whenReady(Future(protectedOperation(2, 1)).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : tripped failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitException("tripped failure", 10) } } @@ -671,12 +755,16 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { // the following is a terrible idea - don't do this; but if someone does, ignore and log case _ => throw new Exception("a dumb thing to do") }.build()) - .foreach { cb => - def protectedOperation() = cb() { 1 } + .foreach { x => + implicit val cb: CircuitBreaker = x + + def protectedOperation() = cb() { + 1 + } // this should not blow up with an exception whenReady(Future(protectedOperation(), timeout(Span(100, Millis)))) { - case (result, timeout) => + case (result, _) => withClue(s"${cb.name} : still return result when non-failure function throws") { result shouldEqual 1 } @@ -687,20 +775,14 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { "async circuit breaker configured to ignore certain exceptions" should "not be tripped when these exceptions occur" in { simpleBuilders() .map(_.withNonFailureExceptionCases { - case e: ArithmeticException => true + case _: ArithmeticException => true }.build()) - .foreach { cb => - def protectedOperation(x: Int, y: Int) = cb.async() { - SimpleOperation.asyncOperation(x, y) - } + .foreach { x => + implicit val cb: CircuitBreaker = x // the circuit is never tripped and just lets these exceptions through (1 to 10).foreach { i => - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } + assertArithExceptionAsync(s"failure #$i") } } } @@ -708,9 +790,11 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { it should "not ignore exceptions that have not be configured to be ignored" in { simpleBuilders() .map(_.withNonFailureExceptionCases { - case e: ArithmeticException => true + case _: ArithmeticException => true }.build()) - .foreach { cb => + .foreach { x => + implicit val cb: CircuitBreaker = x + def protectedOperation(x: Int, y: Int) = cb.async() { Future { val ret = SimpleOperation.operation(x, y) @@ -722,11 +806,7 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { // the circuit is never tripped and just lets these exceptions through (1 to 10).foreach { i => - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : (non) failure #$i") { - e shouldBe a[ArithmeticException] - } - } + assertArithExceptionAsync(s"failure #$i") } whenReady(protectedOperation(3, 1).failed, timeout(Span(100, Millis))) { e => @@ -741,11 +821,7 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } // CB is now tripped - whenReady(protectedOperation(2, 1).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : tripped failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitExceptionAsync("tripped failure", 10) } } @@ -755,8 +831,12 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { // the following is a terrible idea - don't do this; but if someone does, ignore and log case _ => throw new Exception("a dumb thing to do") }.build()) - .foreach { cb => - def protectedOperation() = cb.async() { Future(1) } + .foreach { x => + implicit val cb: CircuitBreaker = x + + def protectedOperation() = cb.async() { + Future(1) + } // this should not blow up with an exception whenReady(protectedOperation(), timeout(Span(100, Millis))) { result => @@ -775,25 +855,20 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { // create a state change listener - we'll monitor its onTrip() and onReset() invocations val myStateChangeListener = new CircuitBreakerStateChangeListener { - override def onTrip(resourceName: String): Unit = promiseIllTrip.success(true) + override def onTrip(resourceName: String): Unit = + promiseIllTrip.success(true) - override def onReset(resourceName: String): Unit = promiseIllReset.success(true) + override def onReset(resourceName: String): Unit = + promiseIllReset.success(true) } - val circuitBreaker = CircuitBreakerBuilder("notifyStateChangeListeners", 2, defaultRetryDelay) - .withStateChangeListeners(List(myStateChangeListener)) - .build() - - def protectedOperation(x: Int, y: Int) = circuitBreaker() { - SimpleOperation.operation(x, y) - } + implicit val circuitBreaker: CircuitBreaker = + CircuitBreakerBuilder("notifyStateChangeListeners", 2, defaultRetryDelay) + .withStateChangeListeners(List(myStateChangeListener)) + .build() (1 to 2).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${circuitBreaker.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } + assertArithException(s"failure #$i") } // notified async / on separate thread of CB being *tripped*, wait a bit for it @@ -823,22 +898,36 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { "circuit breaker configured with invocation listeners" should "notify listeners on invocations" in { - val promiseCheckpoint1 = Promise[(Int, Int)]() - val promiseCheckpoint2 = Promise[(Int, Int)]() - val promiseCheckpoint3 = Promise[(Int, Int)]() - val promiseCheckpoint4 = Promise[(Int, Int)]() + val promiseCheckpoint1 = Promise[(Int, Int, Int)]() + val promiseCheckpoint2 = Promise[(Int, Int, Int)]() + val promiseCheckpoint3 = Promise[(Int, Int, Int)]() + val promiseCheckpoint4 = Promise[(Int, Int, Int)]() val myInvocationListener = new CircuitBreakerInvocationListener { var flowStateInvocationCount = 0 var brokenStateInvocationCount = 0 - - private def keepPromises(): Unit = (flowStateInvocationCount, brokenStateInvocationCount) match { - case (10, 0) => promiseCheckpoint1.success((flowStateInvocationCount, brokenStateInvocationCount)) - case (12, 0) => promiseCheckpoint2.success((flowStateInvocationCount, brokenStateInvocationCount)) - case (12, 10) => promiseCheckpoint3.success((flowStateInvocationCount, brokenStateInvocationCount)) - case (13, 11) => promiseCheckpoint4.success((flowStateInvocationCount, brokenStateInvocationCount)) - case _ => - } + var attemptResetStateInvocationCount = 0 + + private def keepPromises(): Unit = + (flowStateInvocationCount, brokenStateInvocationCount, attemptResetStateInvocationCount) match { + case (10, 0, 0) => + promiseCheckpoint1.success( + (flowStateInvocationCount, brokenStateInvocationCount, attemptResetStateInvocationCount) + ) + case (12, 0, 0) => + promiseCheckpoint2.success( + (flowStateInvocationCount, brokenStateInvocationCount, attemptResetStateInvocationCount) + ) + case (12, 10, 0) => + promiseCheckpoint3.success( + (flowStateInvocationCount, brokenStateInvocationCount, attemptResetStateInvocationCount) + ) + case (13, 10, 1) => + promiseCheckpoint4.success( + (flowStateInvocationCount, brokenStateInvocationCount, attemptResetStateInvocationCount) + ) + case _ => + } override def onInvocationInFlowState(resourceName: String): Unit = { flowStateInvocationCount = flowStateInvocationCount + 1 @@ -849,26 +938,29 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { brokenStateInvocationCount += 1 keepPromises() } - } - val circuitBreaker = CircuitBreakerBuilder("notifyInvocationListeners", 2, defaultRetryDelay) - .withInvocationListeners(List(myInvocationListener)) - .build() - - def protectedOperation(x: Int, y: Int) = circuitBreaker() { - SimpleOperation.operation(x, y) + override def onInvocationInAttemptResetState(resourceName: String): Unit = { + attemptResetStateInvocationCount += 1 + keepPromises() + } } + implicit val circuitBreaker: CircuitBreaker = + CircuitBreakerBuilder("notifyInvocationListeners", 2, defaultRetryDelay) + .withInvocationListeners(List(myInvocationListener)) + .build() + // 10 invocations in flow state (1 to 10).foreach { _ => protectedOperation(1, 1) } whenReady(promiseCheckpoint1.future, timeout(Span(50, Millis))) { - case (invoked, broken) => + case (invoked, broken, attempt) => withClue("expected (10 invocation, 0 broken) notifications at checkpoint 1: ") { invoked shouldEqual 10 broken shouldEqual 0 + attempt shouldEqual 0 } } @@ -880,10 +972,11 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } whenReady(promiseCheckpoint2.future, timeout(Span(50, Millis))) { - case (invoked, broken) => + case (invoked, broken, attempt) => withClue("expected (12 invocation, 0 broken) notifications at checkpoint 2: ") { invoked shouldEqual 12 broken shouldEqual 0 + attempt shouldEqual 0 } } @@ -895,10 +988,11 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } whenReady(promiseCheckpoint3.future, timeout(Span(50, Millis))) { - case (invoked, broken) => + case (invoked, broken, attempt) => withClue("expected (12 invocation, 10 broken) notifications at checkpoint 3: ") { invoked shouldEqual 12 broken shouldEqual 10 + attempt shouldEqual 0 } } @@ -912,18 +1006,21 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { protectedOperation(2, 1) whenReady(promiseCheckpoint4.future, timeout(Span(50, Millis))) { - case (invoked, broken) => + case (invoked, broken, attempt) => withClue("expected (13 invocation, 11 broken) notifications at checkpoint 4: ") { invoked shouldEqual 13 - broken shouldEqual 11 + broken shouldEqual 10 + attempt shouldEqual 1 } } } "circuit breaker creation" should "register circuit breaker with the shared registry" in { - val circuitBreaker = CircuitBreakerBuilder("register", 2, defaultRetryDelay).build() + val circuitBreaker = + CircuitBreakerBuilder("register", 2, defaultRetryDelay).build() - val maybeRegisteredCircuitBreaker = CircuitBreakerRegistry.get(circuitBreaker.name) + val maybeRegisteredCircuitBreaker = + CircuitBreakerRegistry.get(circuitBreaker.name) maybeRegisteredCircuitBreaker should be('defined) val registeredCircuitBreaker = maybeRegisteredCircuitBreaker.get @@ -934,17 +1031,21 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } "circuit breaker exception checking" should "let ControlThrowable exceptions through without affecting the state of the circuit breaker" in { - val circuitBreaker = CircuitBreakerBuilder("controlThrowable", 2, defaultRetryDelay).build() + val circuitBreaker = + CircuitBreakerBuilder("controlThrowable", 2, defaultRetryDelay).build() - val protectedOperation = circuitBreaker() { new ControlThrowable {} } + val protectedOperation = circuitBreaker() { + new ControlThrowable {} + } // the circuit breaker is never tripped (1 to 5).foreach { _ => try { protectedOperation } catch { - case e: CircuitBreakerBrokenException => fail("the circuit breaker should not have been tripped") - case e: ControlThrowable => //cool + case _: CircuitBreakerBrokenException => + fail("the circuit breaker should not have been tripped") + case _: ControlThrowable => //cool case e: Throwable => throw e } }