@@ -17,8 +17,8 @@ import java.util.concurrent.Future
1717import java.util.concurrent.TimeUnit
1818import javax.persistence.OptimisticLockException
1919import kotlin.reflect.KClass
20- import misk.backoff.ExponentialBackoff
2120import misk.concurrent.ExecutorServiceFactory
21+ import misk.jdbc.TransactionRetryHandler
2222import misk.hibernate.advisorylocks.tryAcquireLock
2323import misk.hibernate.advisorylocks.tryReleaseLock
2424import misk.jdbc.CheckDisabler
@@ -59,6 +59,11 @@ private constructor(
5959 private val shardListFetcher: ShardListFetcher ,
6060 private val hibernateEntities: Set <HibernateEntity >,
6161) : Transacter {
62+
63+ private val retryHandler = TransactionRetryHandler (
64+ qualifierName = qualifier.simpleName ? : " hibernate" ,
65+ exceptionClassifier = HibernateExceptionClassifier ()
66+ )
6267 constructor (
6368 qualifier: KClass <out Annotation >,
6469 sessionFactoryService: SessionFactoryService ,
@@ -217,46 +222,13 @@ private constructor(
217222 }
218223
219224 private fun <T > transactionWithRetriesInternal (block : () -> T ): T {
220- require(options.maxAttempts > 0 )
221-
222- val backoff =
223- ExponentialBackoff (
224- Duration .ofMillis(options.minRetryDelayMillis),
225- Duration .ofMillis(options.maxRetryDelayMillis),
226- Duration .ofMillis(options.retryJitterMillis),
227- )
228- var attempt = 0
229-
230- while (true ) {
231- try {
232- attempt++
233- val result = block()
234-
235- if (attempt > 1 ) {
236- logger.info { " retried ${qualifier.simpleName} transaction succeeded (attempt $attempt )" }
237- }
238-
239- return result
240- } catch (e: Exception ) {
241- if (! isRetryable(e)) throw e
242-
243- if (attempt >= options.maxAttempts) {
244- logger.info {
245- " ${qualifier.simpleName} recoverable transaction exception " + " (attempt $attempt ), no more attempts"
246- }
247- throw e
248- }
249-
250- val sleepDuration = backoff.nextRetry()
251- logger.info(e) {
252- " ${qualifier.simpleName} recoverable transaction exception " +
253- " (attempt $attempt ), will retry after a $sleepDuration delay"
254- }
255-
256- if (! sleepDuration.isZero) {
257- Thread .sleep(sleepDuration.toMillis())
258- }
259- }
225+ return retryHandler.executeWithRetries(
226+ maxAttempts = options.maxAttempts,
227+ minRetryDelayMillis = options.minRetryDelayMillis,
228+ maxRetryDelayMillis = options.maxRetryDelayMillis,
229+ retryJitterMillis = options.retryJitterMillis
230+ ) {
231+ block()
260232 }
261233 }
262234
@@ -304,7 +276,7 @@ private constructor(
304276 return ConstraintViolationException (sqlException.message, sqlException, " " )
305277 }
306278 // write-write conflicts fail at COMMIT rather than waiting on a lock.
307- e.cause is SQLException && isTidbWriteConflict (e.cause as SQLException ) -> {
279+ e.cause is SQLException && (e.cause as SQLException ).errorCode == 9007 -> {
308280 val sqlException = e.cause as SQLException
309281 return ConstraintViolationException (sqlException.message, sqlException, " " )
310282 }
@@ -423,68 +395,7 @@ private constructor(
423395 }
424396 }
425397
426- private fun isRetryable (th : Throwable ): Boolean {
427- return when (th) {
428- is RetryTransactionException ,
429- is StaleObjectStateException ,
430- is LockAcquisitionException ,
431- is SQLRecoverableException ,
432- is SQLTransientException ,
433- is OptimisticLockException -> true
434- is SQLException -> if (isMessageRetryable(th)) true else isCauseRetryable(th)
435- else -> isCauseRetryable(th)
436- }
437- }
438-
439- private fun isMessageRetryable (th : SQLException ) =
440- isConnectionClosed(th) ||
441- isVitessTransactionNotFound(th) ||
442- isCockroachRestartTransaction(th) ||
443- isTidbWriteConflict(th)
444-
445- /* *
446- * This is thrown as a raw SQLException from Hikari even though it is most certainly a recoverable exception. See
447- * com/zaxxer/hikari/pool/ProxyConnection.java:493
448- */
449- private fun isConnectionClosed (th : SQLException ) = th.message.equals(" Connection is closed" )
450-
451- /* *
452- * We get this error as a MySQLQueryInterruptedException when a tablet gracefully terminates, we just need to retry
453- * the transaction and the new primary should handle it.
454- *
455- * ```
456- * vttablet: rpc error: code = Aborted desc = transaction 1572922696317821557:
457- * not found (CallerID: )
458- * ```
459- */
460- private fun isVitessTransactionNotFound (th : SQLException ): Boolean {
461- val message = th.message
462- return message != null &&
463- message.contains(" vttablet: rpc error" ) &&
464- message.contains(" code = Aborted" ) &&
465- message.contains(" transaction" ) &&
466- message.contains(" not found" )
467- }
468-
469- /* *
470- * "Messages with the error code 40001 and the string restart transaction indicate that a transaction failed because
471- * it conflicted with another concurrent or recent transaction accessing the same data. The transaction needs to be
472- * retried by the client." https://www.cockroachlabs.com/docs/stable/common-errors.html#restart-transaction
473- */
474- private fun isCockroachRestartTransaction (th : SQLException ): Boolean {
475- val message = th.message
476- return th.errorCode == 40001 && message != null && message.contains(" restart transaction" )
477- }
478-
479- /* *
480- * "Transactions in TiKV encounter write conflicts". This can happen when optimistic transaction mode is on. Conflicts
481- * are detected during transaction commit https://docs.pingcap.com/tidb/dev/tidb-faq#error-9007-hy000-write-conflict
482- */
483- private fun isTidbWriteConflict (th : SQLException ): Boolean {
484- return th.errorCode == 9007
485- }
486398
487- private fun isCauseRetryable (th : Throwable ) = th.cause?.let { isRetryable(it) } ? : false
488399
489400 // NB: all options should be immutable types as copy() is shallow.
490401 internal data class TransacterOptions (
0 commit comments