Skip to content

Scala: Context propagation and async interceptors #2984

@ostronom

Description

@ostronom

I want to pass some values from interceptor to rpc handler. I've read that this can be done with contexts. But the problem is, that my interceptor is asynchronous, i.e. it "waits" for the future to resolve before calling next listener. The context is lost in this situation. My code is in Scala:

case class AsyncContextawareInterceptor[A](
    f: Metadata  Future[Either[Status, (Context.Key[A], A)]]
)(implicit val system: ActorSystem)
    extends ServerInterceptor
    with AnyLogging {
  import system.dispatcher

  sealed trait Msg
  case object HalfClose extends Msg
  case object Cancel extends Msg
  case object Complete extends Msg
  case object Ready extends Msg
  case class Message[T](msg: T) extends Msg

  override def interceptCall[ReqT, RespT](call: ServerCall[ReqT, RespT],
                                          headers: Metadata,
                                          next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] =
    new ServerCall.Listener[ReqT] {
      private val stash = new java.util.concurrent.ConcurrentLinkedQueue[Msg]()
      private var interceptor: Option[ServerCall.Listener[ReqT]] = None

      private def enqueueAndProcess(msg: Msg) =
        if (interceptor.isDefined) processMessage(msg) else stash.add(msg)

      private def processMessage(msg: Msg) = msg match {
        case HalfClose  interceptor.foreach(_.onHalfClose)
        case Cancel  interceptor.foreach(_.onCancel)
        case Complete  interceptor.foreach(_.onComplete)
        case Ready  interceptor.foreach(_.onReady)
        case Message(msg: ReqT @unchecked)  interceptor.foreach(_.onMessage(msg))
      }

      private def processMessages() = while (!stash.isEmpty) {
        Option(stash.poll).foreach(processMessage)
      }

      override def onHalfClose(): Unit = enqueueAndProcess(HalfClose)

      override def onCancel(): Unit = enqueueAndProcess(Cancel)

      override def onComplete(): Unit = enqueueAndProcess(Complete)

      override def onReady(): Unit = enqueueAndProcess(Ready)

      override def onMessage(message: ReqT): Unit = enqueueAndProcess(Message(message))

      f(headers).map {
        case Right((k, v)) 
          val context = Context.current.withValue(k, v)
          interceptor = Some(Contexts.interceptCall(context, call, headers, next))
          processMessages()
        case Left(status)  call.close(status, new Metadata())
      }.recover {
        case t: Throwable 
          log.error(t, "AsyncContextawareInterceptor future failed")
          call.close(Status.fromThrowable(t), new Metadata())
      }
    }
}

object AuthInterceptor {
  val BOTID_CONTEXT_KEY: Context.Key[Int] = Context.key[Int]("botId")
  val TOKEN_HEADER_KEY: Metadata.Key[String] = Metadata.Key.of[String]("token", Metadata.ASCII_STRING_MARSHALLER)

  def authInterceptor(resolver: String  Future[Option[Int]])(implicit system: ActorSystem): ServerInterceptor =
    AsyncContextawareInterceptor { metadata 
      import system.dispatcher
      (for {
        token  OptionT.fromOption[Future](Option(metadata.get(TOKEN_HEADER_KEY)))
        botId  OptionT(resolver(token))
      } yield botId).value.map {
        case Some(id)  Right(BOTID_CONTEXT_KEY  id)
        case None  Left(Status.PERMISSION_DENIED)
      }
    }
}

The problem is that BOTID_CONTEXT_KEY.get is null in RPC handler, even when the future was resolved and the not-null value was set.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions