@@ -56,8 +56,7 @@ class SparkConnectClientRetriesSuite
5656 private def createTestExceptionWithDetails (
5757 msg : String ,
5858 code : Status .Code = Status .Code .INTERNAL ,
59- retryDelay : FiniteDuration = FiniteDuration (0 , " s" )
60- ): StatusRuntimeException = {
59+ retryDelay : FiniteDuration = FiniteDuration (0 , " s" )): StatusRuntimeException = {
6160 // In grpc-java, RetryDelay should be specified as seconds: Long + nanos: Int
6261 val seconds = retryDelay.toSeconds
6362 val nanos = (retryDelay - FiniteDuration (seconds, " s" )).toNanos.toInt
@@ -83,8 +82,7 @@ class SparkConnectClientRetriesSuite
8382 private def assertLongSequencesAlmostEqual (
8483 first : Seq [Long ],
8584 second : Seq [Long ],
86- delta : Long
87- ): Unit = {
85+ delta : Long ): Unit = {
8886 assert(first.length == second.length, " Lists have different lengths." )
8987 for ((a, b) <- first.zip(second)) {
9088 assert(math.abs(a - b) <= delta, s " Elements $a and $b differ by more than $delta. " )
@@ -199,10 +197,8 @@ class SparkConnectClientRetriesSuite
199197 }
200198 test(" DefaultPolicy retries exceptions with RetryInfo" ) {
201199 // Error contains RetryInfo with retry_delay set to 0
202- val dummyFn = new DummyFn (
203- createTestExceptionWithDetails(msg = " Some error message" ),
204- numFails = 100
205- )
200+ val dummyFn =
201+ new DummyFn (createTestExceptionWithDetails(msg = " Some error message" ), numFails = 100 )
206202 val retryPolicies = RetryPolicy .defaultPolicies()
207203 val retryHandler = new GrpcRetryHandler (retryPolicies, sleep = _ => {})
208204 assertThrows[RetriesExceeded ] {
@@ -218,12 +214,8 @@ class SparkConnectClientRetriesSuite
218214 val st = new SleepTimeTracker ()
219215 val retryDelay = FiniteDuration (5 , " min" )
220216 val dummyFn = new DummyFn (
221- createTestExceptionWithDetails(
222- msg = " Some error message" ,
223- retryDelay = retryDelay
224- ),
225- numFails = 100
226- )
217+ createTestExceptionWithDetails(msg = " Some error message" , retryDelay = retryDelay),
218+ numFails = 100 )
227219 val retryPolicies = RetryPolicy .defaultPolicies()
228220 val retryHandler = new GrpcRetryHandler (retryPolicies, sleep = st.sleep)
229221
@@ -243,12 +235,8 @@ class SparkConnectClientRetriesSuite
243235 val st = new SleepTimeTracker ()
244236 val retryDelay = FiniteDuration (5 , " d" )
245237 val dummyFn = new DummyFn (
246- createTestExceptionWithDetails(
247- msg = " Some error message" ,
248- retryDelay = retryDelay
249- ),
250- numFails = 100
251- )
238+ createTestExceptionWithDetails(msg = " Some error message" , retryDelay = retryDelay),
239+ numFails = 100 )
252240 val retryPolicies = RetryPolicy .defaultPolicies()
253241 val retryHandler = new GrpcRetryHandler (retryPolicies, sleep = st.sleep)
254242
@@ -271,14 +259,10 @@ class SparkConnectClientRetriesSuite
271259 List .fill(2 )(
272260 createTestExceptionWithDetails(
273261 msg = " Some error message" ,
274- retryDelay = retryDelay
275- )
276- ) ++ List .fill(3 )(
262+ retryDelay = retryDelay)) ++ List .fill(3 )(
277263 createTestExceptionWithDetails(
278264 msg = " Some error message" ,
279- code = Status .Code .UNAVAILABLE
280- )
281- )
265+ code = Status .Code .UNAVAILABLE ))
282266 ).iterator
283267
284268 retryHandler.retry({
@@ -290,9 +274,8 @@ class SparkConnectClientRetriesSuite
290274
291275 // Should be retried by DefaultPolicy
292276 val policy = retryPolicies.find(_.name == " DefaultPolicy" ).get
293- val expectedSleeps = List .fill(2 )(retryDelay.toMillis) ++ List .tabulate(3 )(
294- i => policy.initialBackoff.toMillis * math.pow(policy.backoffMultiplier, i + 2 ).toLong
295- )
277+ val expectedSleeps = List .fill(2 )(retryDelay.toMillis) ++ List .tabulate(3 )(i =>
278+ policy.initialBackoff.toMillis * math.pow(policy.backoffMultiplier, i + 2 ).toLong)
296279 assertLongSequencesAlmostEqual(st.times, expectedSleeps, delta = policy.jitter.toMillis)
297280 }
298281}
0 commit comments