Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions src/main/scala/com/hootsuite/circuitbreaker/CircuitBreaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class CircuitBreaker private[circuitbreaker] (
val name: String,
val failLimit: Int,
val retryDelay: FiniteDuration,
val isExponentialBackoff: Boolean = false,
val isResultFailure: PartialFunction[Any, Boolean] = { case _ => false },
val isExceptionNotFailure: PartialFunction[Throwable, Boolean] = { case _ => false },
val stateChangeListeners: List[CircuitBreakerStateChangeListener] = List(),
Expand All @@ -38,6 +39,7 @@ class CircuitBreaker private[circuitbreaker] (
builder.name,
builder.failLimit,
builder.retryDelay,
builder.isExponentialBackoff,
builder.isResultFailure,
builder.isExceptionNotFailure,
builder.stateChangeListeners,
Expand Down Expand Up @@ -180,9 +182,9 @@ class CircuitBreaker private[circuitbreaker] (
* @param currentState the expected current state
* @return true when the state was changed, false when the given state was not the current state
*/
def attemptResetBrokenState(currentState: BrokenState): Boolean = {
def attemptResetBrokenState(currentState: BrokenState, retryCount: Int): Boolean = {
logger.debug(s"Circuit breaker \'$name\', attempting to reset open/broken state")
state.compareAndSet(currentState, new BrokenState(this))
state.compareAndSet(currentState, new BrokenState(this, retryCount))
}

/**
Expand Down Expand Up @@ -286,7 +288,7 @@ private object CircuitBreaker {
override def onFailure(): Unit =
incrementFailure()

private[this] def incrementFailure() = {
private[this] def incrementFailure(): Unit = {
val currentCount = failureCount.incrementAndGet
logger.debug(
s"Circuit breaker ${cb.name} increment failure count to $currentCount; fail limit is ${cb.failLimit}"
Expand All @@ -298,8 +300,10 @@ private object CircuitBreaker {
/**
* CircuitBreaker is opened/broken. Invocations fail immediately.
*/
class BrokenState(cb: CircuitBreaker) extends State {
val retryAt: Long = System.currentTimeMillis() + cb.retryDelay.toMillis
class BrokenState(cb: CircuitBreaker, retryCount: Int=0) extends State {
val retryDelay: Long = cb.retryDelay.toMillis * Math.pow(2,
if (cb.isExponentialBackoff) retryCount else 1).toLong
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when isExponentialBackoff is false, this will make the cb.retryDelay.toMillis to be multiplied by 2 as Math.pow(2.0, 1.0) == 2.0

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, good catch, I changed this. Will address.

val retryAt: Long = System.currentTimeMillis() + retryDelay

override def preInvoke(): Unit = {
cb.invocationListeners.foreach { listener =>
Expand All @@ -311,7 +315,7 @@ private object CircuitBreaker {
}

val retry = System.currentTimeMillis > retryAt
if (!(retry && cb.attemptResetBrokenState(this))) {
if (!(retry && cb.attemptResetBrokenState(this, this.retryCount + 1))) {
throw new CircuitBreakerBrokenException(
cb.name,
s"Making ${cb.name} unavailable after ${cb.failLimit} errors"
Expand Down Expand Up @@ -345,6 +349,7 @@ case class CircuitBreakerBuilder(
name: String,
failLimit: Int,
retryDelay: FiniteDuration,
isExponentialBackoff: Boolean = false,
isResultFailure: PartialFunction[Any, Boolean] = { case _ => false },
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably add a maxRetryDelay: FiniteDuration that bounds the exponential growth.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would also be nice if there were a withExponentialBackoff(maxRetryDelay: FiniteDuration): CircuitBreakerBuilder convenience method on the builder

isExceptionNotFailure: PartialFunction[Throwable, Boolean] = { case _ => false },
stateChangeListeners: List[CircuitBreakerStateChangeListener] = List(),
Expand Down
161 changes: 130 additions & 31 deletions src/test/scala/com/hootsuite/circuitbreaker/CircuitBreakerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,66 +9,67 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{Await, Future, Promise}
import scala.util.control.ControlThrowable
import scala.util.{Failure, Success}
import scala.util.{Failure, Success, Try}
import java.util.concurrent.TimeUnit

class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures {

object SimpleOperation {

def operation(x: Int, y: Int): Int = x / y

def asyncOperation(x: Int, y: Int): Future[Int] = Future { x / y }
}
private val numMillisecondsForRetryDelay = 200L

// for whenReady calls
private implicit val defaultPatience =
PatienceConfig(timeout = Span(2, Seconds), interval = Span(10, Millis))

private val numMillisecondsForRetryDelay = 200L
private val defaultRetryDelay = Duration(numMillisecondsForRetryDelay, TimeUnit.MILLISECONDS)

private def waitUntilRetryDelayHasExpired() = Thread.sleep(2 * numMillisecondsForRetryDelay)
private def waitUntilRetryDelayHasExpired(millis: Option[Long]=None) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 why scalafmt didn't fix this...

millis match {
case Some(x) => Thread.sleep(x)
case None => Thread.sleep(2 * numMillisecondsForRetryDelay)
}

private def simpleBuilder(name: String, failLimit: Int, retryDelay: FiniteDuration) =
CircuitBreakerBuilder(name = name, failLimit = failLimit, retryDelay = retryDelay)
}

// CB builders that misbehave on (state change|invocation) listeners - either throwing or blocking
// + one plain CB for baseline
private def simpleBuilders(
failLimit: Int = 2,
retryDelay: FiniteDuration = defaultRetryDelay
): List[CircuitBreakerBuilder] = List(
simpleBuilder("simple", failLimit, retryDelay),
simpleBuilder("invocation listeners both block", failLimit, retryDelay)
failLimit: Int = 2,
retryDelay: FiniteDuration = defaultRetryDelay,
isExponentialBackoff: Boolean = false
): List[CircuitBreakerBuilder] = List(
simpleBuilder("simple", failLimit, retryDelay, isExponentialBackoff),
simpleBuilder("invocation listeners both block", failLimit,
retryDelay, isExponentialBackoff)
.withInvocationListeners(List(new CircuitBreakerInvocationListener {

override def onInvocationInFlowState(name: String) = Thread.sleep(5000)

override def onInvocationInBrokenState(name: String) = Thread.sleep(5000)
})),
simpleBuilder("invocation flow throws, invocation broken blocks", failLimit, retryDelay)
simpleBuilder("invocation flow throws, invocation broken blocks", failLimit,
retryDelay, isExponentialBackoff)
.withInvocationListeners(List(new CircuitBreakerInvocationListener {

override def onInvocationInFlowState(name: String) = throw new Exception("boom")

override def onInvocationInBrokenState(name: String) = Thread.sleep(5000)
})),
simpleBuilder("invocation flow blocks, invocation broken blocks", failLimit, retryDelay)
simpleBuilder("invocation flow blocks, invocation broken blocks", failLimit,
retryDelay, isExponentialBackoff)
.withInvocationListeners(List(new CircuitBreakerInvocationListener {

override def onInvocationInFlowState(name: String) = Thread.sleep(5000)

override def onInvocationInBrokenState(name: String) = throw new Exception("boom")
})),
simpleBuilder("invocation listeners both throw", failLimit, retryDelay)
simpleBuilder("invocation listeners both throw", failLimit,
retryDelay, isExponentialBackoff)
.withInvocationListeners(List(new CircuitBreakerInvocationListener {

override def onInvocationInFlowState(name: String) = throw new Exception("boom")

override def onInvocationInBrokenState(name: String) = throw new Exception("boom")
})),
simpleBuilder("state change listeners all throw", failLimit, retryDelay)
simpleBuilder("state change listeners all throw", failLimit,
retryDelay, isExponentialBackoff)
.withStateChangeListeners(List(new CircuitBreakerStateChangeListener {

override def onInit(name: String) = throw new Exception("boom")
Expand All @@ -77,7 +78,8 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures {

override def onReset(name: String) = throw new Exception("boom")
})),
simpleBuilder("state change listeners all block", failLimit, retryDelay)
simpleBuilder("state change listeners all block", failLimit,
retryDelay, isExponentialBackoff)
.withStateChangeListeners(List(new CircuitBreakerStateChangeListener {

override def onInit(name: String) = Thread.sleep(5000)
Expand All @@ -86,7 +88,8 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures {

override def onReset(name: String) = Thread.sleep(5000)
})),
simpleBuilder("state change onInit throws, onTrip, onReset block", failLimit, retryDelay)
simpleBuilder("state change onInit throws, onTrip, onReset block", failLimit,
retryDelay, isExponentialBackoff)
.withStateChangeListeners(List(new CircuitBreakerStateChangeListener {

override def onInit(name: String) = throw new Exception("boom")
Expand All @@ -95,7 +98,8 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures {

override def onReset(name: String) = Thread.sleep(5000)
})),
simpleBuilder("state change onTrip throws, onInit, onReset block", failLimit, retryDelay)
simpleBuilder("state change onTrip throws, onInit, onReset block", failLimit,
retryDelay, isExponentialBackoff)
.withStateChangeListeners(List(new CircuitBreakerStateChangeListener {

override def onInit(name: String) = Thread.sleep(5000)
Expand All @@ -104,7 +108,8 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures {

override def onReset(name: String) = Thread.sleep(5000)
})),
simpleBuilder("state change onReset throws, onInit, onReset block", failLimit, retryDelay)
simpleBuilder("state change onReset throws, onInit, onReset block", failLimit,
retryDelay, isExponentialBackoff)
.withStateChangeListeners(List(new CircuitBreakerStateChangeListener {

override def onInit(name: String) = Thread.sleep(5000)
Expand All @@ -115,6 +120,21 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures {
}))
)

private def simpleBuilder(name: String, failLimit: Int,
retryDelay: FiniteDuration,
isExponentialBackoff: Boolean) =
CircuitBreakerBuilder(name = name, failLimit = failLimit,
retryDelay = retryDelay, isExponentialBackoff = isExponentialBackoff)

object SimpleOperation {

def operation(x: Int, y: Int): Int = x / y

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of division by zero, you are going to want this to be Try-able: def operation(x: Int, y: Int): Try[Int] = Try(x / y)


def asyncOperation(x: Int, y: Int): Future[Int] = Future {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider: def asyncOperation(x: Int, y: int): Future[Try[Int]] = Future(operation(x, y))

x / y
}
}

"simple circuit breaker" should "record failures, trip, then reset after delay time has elapsed" in {

simpleBuilders().map(_.build()).foreach { cb =>
Expand Down Expand Up @@ -149,6 +169,72 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures {
//wait a bit
waitUntilRetryDelayHasExpired()


//circuit should now be closed and a valid operation should just work
whenReady(Future(protectedOperation(2, 1), timeout(Span(100, Millis)))) {
case (result, timeout) =>
withClue(s"${cb.name} : after retry delay") {
result shouldEqual 2
}
}
}
}

it should "wait exponentially longer before retrying if is an exponential backoff CB" in {

val baseRetryTime = 30
simpleBuilders(retryDelay = Duration(baseRetryTime, TimeUnit.MILLISECONDS),
isExponentialBackoff = true).map(_.build()).foreach { cb =>
def protectedOperation(x: Int, y: Int) = cb() {
SimpleOperation.operation(x, y)
}

def assertArithException(hint: String) = {
whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e =>
withClue(s"${cb.name} : $hint") {
e shouldBe a[ArithmeticException]
}
}
}

def assertCircuitException(hint: String) = {
whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e =>
withClue(s"${cb.name} : $hint") {
e shouldBe a[CircuitBreakerBrokenException]
}
}
}

val start_time = System.currentTimeMillis()


//should fail on first exception
assertArithException("1st should be Arithmetic")

//should fail on second exception because the circuit has not been tripped
assertArithException("2nd should be Arithmetic")

//next attempt should be circuit because we have reached the fail limit of 2
assertCircuitException("3rd should be circuit")

//wait the normal amount of time
waitUntilRetryDelayHasExpired(Some(baseRetryTime))

//should fail on first exception
assertArithException("4th should be Arithmetic")

//next attempt should be circuit we are in a broken state, not a flow state
assertCircuitException("5th should be circuit")

//wait a bit (this wont be enough, need to wait twice as long
waitUntilRetryDelayHasExpired(Some(baseRetryTime))

//next attempt should fail, we didn't wait long enough
assertCircuitException("6th should be circuit")

//wait a lot longer
waitUntilRetryDelayHasExpired(Some(baseRetryTime * 2))

//circuit should now be closed and a valid operation should just work
whenReady(Future(protectedOperation(2, 1), timeout(Span(100, Millis)))) {
case (result, timeout) =>
Expand All @@ -159,6 +245,7 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures {
}
}


it should "remain in tripped state on repeated errors" in {
simpleBuilders(retryDelay = Duration(2, TimeUnit.SECONDS)).map(_.build()).foreach { cb =>
def protectedOperation(x: Int, y: Int) = cb() {
Expand Down Expand Up @@ -540,7 +627,9 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures {
case _ => throw new Exception("a dumb thing to do")
}.build())
.foreach { cb =>
def protectedOperation() = cb() { 1 }
def protectedOperation() = cb() {
1
}

// this should not blow up with an exception
whenReady(Future(protectedOperation(), timeout(Span(100, Millis)))) {
Expand Down Expand Up @@ -592,7 +681,11 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures {
case _ => throw new Exception("a dumb thing to do")
}.build())
.foreach { cb =>
def protectedOperation(): Future[Int] = cb.async() { Future { 1 } }
def protectedOperation(): Future[Int] = cb.async() {
Future {
1
}
}

// this should not blow up with an exception
whenReady(protectedOperation(), timeout(Span(100, Millis))) { result =>
Expand Down Expand Up @@ -672,7 +765,9 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures {
case _ => throw new Exception("a dumb thing to do")
}.build())
.foreach { cb =>
def protectedOperation() = cb() { 1 }
def protectedOperation() = cb() {
1
}

// this should not blow up with an exception
whenReady(Future(protectedOperation(), timeout(Span(100, Millis)))) {
Expand Down Expand Up @@ -756,7 +851,9 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures {
case _ => throw new Exception("a dumb thing to do")
}.build())
.foreach { cb =>
def protectedOperation() = cb.async() { Future(1) }
def protectedOperation() = cb.async() {
Future(1)
}

// this should not blow up with an exception
whenReady(protectedOperation(), timeout(Span(100, Millis))) { result =>
Expand Down Expand Up @@ -936,7 +1033,9 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures {
"circuit breaker exception checking" should "let ControlThrowable exceptions through without affecting the state of the circuit breaker" in {
val circuitBreaker = CircuitBreakerBuilder("controlThrowable", 2, defaultRetryDelay).build()

val protectedOperation = circuitBreaker() { new ControlThrowable {} }
val protectedOperation = circuitBreaker() {
new ControlThrowable {}
}

// the circuit breaker is never tripped
(1 to 5).foreach { _ =>
Expand Down