Skip to content

Commit 755478d

Browse files
ChatManager to manage sending/receiving of chat messages (#143)
Co-authored-by: Théo Monnom <[email protected]>
1 parent ede169c commit 755478d

File tree

5 files changed

+205
-1
lines changed

5 files changed

+205
-1
lines changed

.github/workflows/tests.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ jobs:
1717
- uses: actions/checkout@v3
1818
with:
1919
submodules: true
20+
lfs: true
2021

2122
- uses: actions/setup-python@v4
2223
- name: Run tests
2324
run: |
25+
python3 ./livekit-rtc/rust-sdks/download_ffi.py --output livekit-rtc/livekit/rtc/resources
2426
pip3 install pytest ./livekit-protocol ./livekit-api ./livekit-rtc
25-
pytest . --ignore=livekit-rtc/rust-sks
27+
pytest . --ignore=livekit-rtc/rust-sdks

livekit-rtc/livekit/rtc/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
)
7373
from .video_source import VideoSource
7474
from .video_stream import VideoStream
75+
from .chat import ChatManager, ChatMessage
7576

7677
from .version import __version__
7778

@@ -134,5 +135,7 @@
134135
"VideoFrameBuffer",
135136
"VideoSource",
136137
"VideoStream",
138+
"ChatManager",
139+
"ChatMessage",
137140
"__version__",
138141
]

livekit-rtc/livekit/rtc/_utils.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,22 @@
1+
# Copyright 2023 LiveKit, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
115
import asyncio
216
import logging
317
from collections import deque
418
import ctypes
19+
import random
520
from typing import Callable, Generic, List, TypeVar
621

722
logger = logging.getLogger("livekit")
@@ -101,3 +116,17 @@ async def join(self) -> None:
101116
subs = self._subscribers.copy()
102117
for queue in subs:
103118
await queue.join()
119+
120+
121+
_base62_characters = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
122+
123+
124+
def generate_random_base62(length=12):
125+
"""
126+
Generate a random base62 encoded string of a specified length.
127+
128+
:param length: The desired length of the base62 encoded string.
129+
:return: A base62 encoded string.
130+
"""
131+
global _base62_characters
132+
return "".join(random.choice(_base62_characters) for _ in range(length))

livekit-rtc/livekit/rtc/chat.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
# Copyright 2023 LiveKit, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from dataclasses import dataclass, field
16+
from datetime import datetime
17+
import json
18+
import logging
19+
from typing import Any, Callable, Dict, Literal, Optional
20+
21+
from .room import Room, Participant, DataPacket
22+
from ._event_emitter import EventEmitter
23+
from ._proto.room_pb2 import DataPacketKind
24+
from ._utils import generate_random_base62
25+
26+
_CHAT_TOPIC = "lk-chat-topic"
27+
_CHAT_UPDATE_TOPIC = "lk-chat-update-topic"
28+
29+
EventTypes = Literal["message_received",]
30+
31+
32+
class ChatManager(EventEmitter[EventTypes]):
33+
"""A utility class that sends and receives chat messages in the active session.
34+
35+
It implements LiveKit Chat Protocol, and serializes data to/from JSON data packets.
36+
"""
37+
38+
def __init__(self, room: Room):
39+
super().__init__()
40+
self._lp = room.local_participant
41+
self._room = room
42+
43+
room.on("data_received", self._on_data_received)
44+
45+
def close(self):
46+
self._room.off("data_received", self._on_data_received)
47+
48+
async def send_message(self, message: str) -> "ChatMessage":
49+
"""Send a chat message to the end user using LiveKit Chat Protocol.
50+
51+
Args:
52+
message (str): the message to send
53+
54+
Returns:
55+
ChatMessage: the message that was sent
56+
"""
57+
msg = ChatMessage(
58+
message=message,
59+
is_local=True,
60+
participant=self._lp,
61+
)
62+
await self._lp.publish_data(
63+
payload=json.dumps(msg.asjsondict()),
64+
kind=DataPacketKind.KIND_RELIABLE,
65+
topic=_CHAT_TOPIC,
66+
)
67+
return msg
68+
69+
async def update_message(self, message: "ChatMessage"):
70+
"""Update a chat message that was previously sent.
71+
72+
If message.deleted is set to True, we'll signal to remote participants that the message
73+
should be deleted.
74+
"""
75+
await self._lp.publish_data(
76+
payload=json.dumps(message.asjsondict()),
77+
kind=DataPacketKind.KIND_RELIABLE,
78+
topic=_CHAT_UPDATE_TOPIC,
79+
)
80+
81+
def on_message(self, callback: Callable[["ChatMessage"], None]):
82+
"""Register a callback to be called when a chat message is received from the end user."""
83+
self._callback = callback
84+
85+
def _on_data_received(self, dp: DataPacket):
86+
# handle both new and updates the same way, as long as the ID is in there
87+
# the user can decide how to replace the previous message
88+
if dp.topic == _CHAT_TOPIC or dp.topic == _CHAT_UPDATE_TOPIC:
89+
try:
90+
parsed = json.loads(dp.data)
91+
msg = ChatMessage.from_jsondict(parsed)
92+
if dp.participant:
93+
msg.participant = dp.participant
94+
self.emit("message_received", msg)
95+
except Exception as e:
96+
logging.warning("failed to parse chat message: %s", e, exc_info=e)
97+
98+
99+
@dataclass
100+
class ChatMessage:
101+
message: Optional[str] = None
102+
id: str = field(default_factory=generate_random_base62)
103+
timestamp: datetime = field(default_factory=datetime.now)
104+
deleted: bool = field(default=False)
105+
106+
# These fields are not part of the wire protocol. They are here to provide
107+
# context for the application.
108+
participant: Optional[Participant] = None
109+
is_local: bool = field(default=False)
110+
111+
@classmethod
112+
def from_jsondict(cls, d: Dict[str, Any]) -> "ChatMessage":
113+
# older version of the protocol didn't contain a message ID, so we'll create one
114+
id = d.get("id") or generate_random_base62()
115+
timestamp = datetime.now()
116+
if d.get("timestamp"):
117+
timestamp = datetime.fromtimestamp(d.get("timestamp", 0) / 1000.0)
118+
msg = cls(
119+
id=id,
120+
timestamp=timestamp,
121+
)
122+
msg.update_from_jsondict(d)
123+
return msg
124+
125+
def update_from_jsondict(self, d: Dict[str, Any]) -> None:
126+
self.message = d.get("message")
127+
self.deleted = d.get("deleted", False)
128+
129+
def asjsondict(self):
130+
"""Returns a JSON serializable dictionary representation of the message."""
131+
d = {
132+
"id": self.id,
133+
"message": self.message,
134+
"timestamp": int(self.timestamp.timestamp() * 1000),
135+
}
136+
if self.deleted:
137+
d["deleted"] = True
138+
return d

livekit-rtc/tests/test_chat.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from datetime import datetime
2+
import json
3+
4+
from livekit.rtc import ChatMessage
5+
6+
7+
def test_message_basics():
8+
msg = ChatMessage()
9+
assert msg.id is not None, "message id should be set"
10+
assert msg.timestamp is not None, "timestamp should be set"
11+
assert msg.timestamp.day == datetime.now().day, "timestamp should be today"
12+
assert len(msg.id) > 5, "message id should be long enough"
13+
14+
15+
def test_message_serialization():
16+
msg = ChatMessage(
17+
message="hello",
18+
)
19+
data = msg.asjsondict()
20+
msg2 = ChatMessage.from_jsondict(json.loads(json.dumps(data)))
21+
assert msg2.message == msg.message, "message should be the same"
22+
assert msg2.id == msg.id, "id should be the same"
23+
assert int(msg2.timestamp.timestamp() / 1000) == int(
24+
msg.timestamp.timestamp() / 1000
25+
), "timestamp should be the same"
26+
assert not msg2.deleted, "not deleted"
27+
28+
# deletion is handled
29+
msg.deleted = True
30+
data = msg.asjsondict()
31+
msg2 = ChatMessage.from_jsondict(json.loads(json.dumps(data)))
32+
assert msg2.deleted, "should be deleted"

0 commit comments

Comments
 (0)