Skip to content

: this will live in redis once testing is complete #331

@github-actions

Description

@github-actions

# TODO:: this will live in redis once testing is complete

require "./application"

module PlaceOS::Api
  class WebRTC < Application
    base "/api/engine/v2/webrtc/"

    # TODO:: this will live in redis once testing is complete
    class CallDetails
      include JSON::Serializable

      getter id : String
      getter peers : Hash(String, HTTP::WebSocket)

      @[JSON::Field(converter: Time::EpochConverter)]
      getter created_at : Time

      @[JSON::Field(converter: Time::EpochConverter)]
      property updated_at : Time

      def initialize(@id : String)
        @peers = {} of String => HTTP::WebSocket
        @updated_at = @created_at = Time.utc
      end
    end

    enum SignalType
      Join
      ParticipantList
      Candidate
      Offer
      Answer
      Ping
      Leave
    end

    struct SessionSignal
      include JSON::Serializable

      # message id, generated by the sender
      getter id : String

      # the unique id of the room to join
      getter session_id : String

      # the type of message
      property type : SignalType

      # the id of the current user
      property! user_id : String

      # the id of the user we want to communicate with
      getter to_user : String?

      # the payload, if any
      @[JSON::Field(converter: String::RawConverter)]
      getter value : String?

      def initialize(@id, @session_id, @type, @user_id, @to_user, @value)
      end
    end

    # use a manager so the we can free the request context objects
    class Manager
      Log = ::Log.for(self)

      def initialize(@ice_config)
      end

      # authority_id => config string
      private getter ice_config : Hash(String, String)
      private getter lock : Mutex = Mutex.new
      private getter calls = {} of String => CallDetails
      private getter sockets = {} of HTTP::WebSocket => SessionSignal

      def handle_session(websocket, request_id, user_id, auth_id)
        websocket.on_message do |message|
          Log.context.set(request_id: request_id, user_id: user_id)
          Log.trace { {frame: "TEXT", text: message} }

          signal = SessionSignal.from_json(message)
          signal.user_id = user_id

          case signal.type
          when .join?
            on_join_signal(websocket, signal, auth_id)
          when .offer?, .answer?, .candidate?
            forward_signal(signal)
          else
            Log.warn { "user #{user_id} sent unsupported signal #{signal.type}" }
          end

          if call = calls[signal.session_id]?
            call.updated_at = Time.utc
          end
        end

        websocket.on_close do |_|
          Log.trace { {request_id: request_id, frame: "CLOSE"} }

          if connect_details = sockets.delete websocket
            if call = calls.delete(connect_details.session_id)
              call.peers.delete connect_details.user_id
              call.updated_at = Time.utc

              connect_details.type = :leave
              call.peers.each_value do |ws|
                send_signal(ws, connect_details)
              end
            end
          end
        end
      end

      def create_new_call(signal) : CallDetails
        calls[signal.session_id] = CallDetails.new(signal.session_id)
      end

      def send_signal(websocket, signal)
        Log.trace { "Sending signal #{signal.type} to #{signal.session_id}" }
        websocket.send(signal.to_json)
      rescue
        # we'll ignore websocket send failures, the user will be cleaned up
      end

      def on_join_signal(websocket, signal, auth_id)
        call = calls[signal.session_id]? || create_new_call(signal)
        call.peers[signal.user_id] = websocket
        sockets[websocket] = signal

        # Return RTC configuration details
        send_signal(websocket, SessionSignal.new(
          id: "SIGNAL::#{Time.utc.to_unix_ms}+#{Random::Secure.hex(6)}",
          type: :join,
          session_id: signal.session_id,
          user_id: "SERVER::DATA",
          to_user: signal.user_id,
          value: ice_config[auth_id]
        ))

        # Send participant list
        send_signal(websocket, SessionSignal.new(
          id: "SIGNAL::#{Time.utc.to_unix_ms}+#{Random::Secure.hex(6)}",
          type: :participant_list,
          session_id: signal.session_id,
          user_id: "SERVER::DATA",
          to_user: signal.user_id,
          value: call.peers.keys.to_json
        ))
      end

      def forward_signal(signal)
        if call = calls[signal.session_id]?
          if to_user = call.peers[signal.to_user]?
            send_signal(to_user, signal)
          end
        end
      end
    end

    ICE_CONFIG = {} of String => String
    MANAGER    = Manager.new(ICE_CONFIG)

    # WebRTC signaller endpoint, managing call participants
    @[AC::Route::WebSocket("/signaller")]
    def signaller(websocket) : Nil
      Log.trace { {request_id: request_id, frame: "OPEN"} }

      authority = current_authority.not_nil!
      auth_id = authority.id.as(String)

      # https://developer.mozilla.org/en-US/docs/Web/API/RTCIceServer
      ICE_CONFIG[auth_id] = authority.internals["webrtc_ice"]?.try(&.to_json) || WEBRTC_DEFAULT_ICE_CONFIG

      MANAGER.handle_session(websocket, request_id, user_token.id, auth_id)
    end
  end
end

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions