diff --git a/ssrspeed/speedtest/method/st_stream.py b/ssrspeed/speedtest/method/st_stream.py index 73ddcb3..32b7297 100644 --- a/ssrspeed/speedtest/method/st_stream.py +++ b/ssrspeed/speedtest/method/st_stream.py @@ -9,25 +9,7 @@ nf_ip_re = re.compile(r'"requestIpAddress":"(.*)"') -def retry(count=5): - def wrapper(func): - async def inner(*args, **kwargs): - for _ in range(count): - result = await func(*args, **kwargs) - if result is True: - break - else: - return False - return True - - return inner - - return wrapper - - class StreamTest: - session = None - @classmethod async def netflix(cls, host, headers, inner_dict, port, outbound_ip): logger.info(f"Performing netflix test LOCAL_PORT: {port}.") @@ -73,53 +55,44 @@ async def netflix(cls, host, headers, inner_dict, port, outbound_ip): logger.error(f"Netflix error: {str(e)}") return {} - @classmethod - @retry(5) - async def netflix_inner_method_1(cls, outbound_ip, inner_dict): - async with cls.session.get( - url="https://www.netflix.com/title/70143836" - ) as response: - if response.status != 200: - return False - text = str(await response.read()) - locate = text.find("preferredLocale") - netflix_ip = nf_ip_re.findall(text)[0].split(",")[0] - logger.info(f"Netflix IP : {netflix_ip}") - region = text[locate + 29 : locate + 31] if locate > 0 else "Unknown" - if outbound_ip == netflix_ip: - logger.info("Netflix test result: Full Native.") - inner_dict["Ntype"] = f"Full Native({region})" - else: - logger.info("Netflix test result: Full DNS.") - inner_dict["Ntype"] = f"Full DNS({region})" - return True - - @classmethod - @retry(5) - async def netflix_inner_method_2(cls, inner_dict): - async with cls.session.get( - url="https://www.netflix.com/title/70242311" - ) as response: - if response.status == 200: - logger.info("Netflix test result: Only Original.") - inner_dict["Ntype"] = "Only Original" - return True - logger.info("Netflix test result: None.") - inner_dict["Ntype"] = "None" - return False - @classmethod async def netflix_new(cls, host, headers, inner_dict, port, outbound_ip): logger.info(f"Performing netflix(new) test LOCAL_PORT: {port}.") try: - async with aiohttp.ClientSession( - headers=headers, - connector=ProxyConnector(host=host, port=port), - timeout=aiohttp.ClientTimeout(connect=10), - ) as cls.session: - result = await cls.netflix_inner_method_1(outbound_ip, inner_dict) - if result is False: - await cls.netflix_inner_method_2(inner_dict) + async with ( + aiohttp.ClientSession( + headers=headers, + connector=ProxyConnector(host=host, port=port), + timeout=aiohttp.ClientTimeout(connect=10), + ) as session, + session.get( + url="https://www.netflix.com/title/70143836" # "https://www.netflix.com/title/70242311" + ) as response1, + ): + if response1.status == 200: + text = str(await response1.read()) + locate = text.find("preferredLocale") + netflix_ip = nf_ip_re.findall(text)[0].split(",")[0] + logger.info(f"Netflix IP : {netflix_ip}") + region = text[locate + 29 : locate + 31] if locate > 0 else "未知" + if outbound_ip == netflix_ip: + logger.info("Netflix test result: Full Native.") + inner_dict["Ntype"] = f"Full Native({region})" + else: + logger.info("Netflix test result: Full DNS.") + inner_dict["Ntype"] = f"Full DNS({region})" + return + async with session.get( + url="https://www.netflix.com/title/70242311" + ) as response2: + rg = "" + if response2.status == 200: + logger.info("Netflix test result: Only Original.") + inner_dict["Ntype"] = "Only Original" + else: + logger.info("Netflix test result: None.") + inner_dict["Ntype"] = "None" + # 测试连接状态 except Exception as e: logger.error(f"Netflix error: {str(e)}") return {} @@ -376,7 +349,7 @@ async def start_stream_test(port, stream_cfg, outbound_ip): if stream_cfg["NETFLIX_TEST"]: test_list.append( asyncio.create_task( - StreamTest.netflix_new(host, headers, inner_dict, port, outbound_ip) + StreamTest.netflix(host, headers, inner_dict, port, outbound_ip) ) ) await asyncio.wait(test_list) diff --git a/ssrspeed/util/emo.py b/ssrspeed/util/emo.py index bbeb42c..5f15639 100644 --- a/ssrspeed/util/emo.py +++ b/ssrspeed/util/emo.py @@ -1,11 +1,17 @@ +import abc +import asyncio import contextlib +import os import re +import shutil from io import BytesIO from typing import ClassVar, Optional import pilmoji import requests +from aiohttp import ClientSession from emoji import demojize +from loguru import logger from unidecode import unidecode @@ -105,6 +111,121 @@ class TossFacePediaSource(EmojiPediaSource): STYLE = "toss-face/342/" +class LocalSource(pilmoji.source.BaseSource): + """ + emoji本地源基类 + """ + + def get_emoji(self, emoji: str, /) -> Optional[BytesIO]: + file_path = self.get_file_path(emoji) + with contextlib.suppress(FileNotFoundError): + with open(file_path, "rb") as file: + return BytesIO(file.read()) + return None + + def get_discord_emoji(self, _id: int, /) -> Optional[BytesIO]: + raise NotImplementedError + + @abc.abstractmethod + def get_file_path(self, emoji: str) -> str: + return "" + + @abc.abstractmethod + def download_emoji(self, download_url): + pass + + +class OpenmojiLocalSource(LocalSource): + """ + 图片源:https://github.com/hfg-gmuend/openmoji/tree/master/color/72x72 + 安装路径:./resources/emoji/openmoji + """ + + def get_discord_emoji(self, _id: int, /) -> Optional[BytesIO]: + pass + + def get_file_path(self, emoji: str) -> str: + code_points = [f"{ord(c):04X}" for c in emoji] + return f"./resources/emoji/openmoji/{'-'.join(code_points)}.png" + + def download_emoji(self, download_url): + pass + + +class TwemojiLocalSource(LocalSource): + """ + 图片源:https://github.com/twitter/twemoji/tree/master/assets/72x72 + 安装路径:./resources/emoji/twemoji + """ + + def __init__(self, init: str = None, proxy=None): + """ + 构造函数中,如果init不为none,则提供下载emoji资源包的url地址 + """ + self.savepath = "./resources/emoji/twemoji.zip" + self._download_url = ( + "https://github.com/twitter/twemoji/archive/refs/tags/v14.0.2.zip" + ) + if init is None: + return + self.download_emoji(init, proxy=proxy) + self.init_emoji(self.savepath) + + @property + def download_url(self): + return self._download_url + + @staticmethod + def init_emoji(savepath: str): + # 解压下载好的文件 + shutil.unpack_archive(savepath, "./resources/emoji/", format="zip") + # print("解压完成") + # 重命名 + dirs = os.listdir("./resources/emoji/") + for d in dirs: + if d.startswith("twemoji") and not d.endswith(".zip"): + os.rename( + os.path.join(os.path.abspath("./resources/emoji/"), d), + os.path.join(os.path.abspath("./resources/emoji/"), "twemoji"), + ) + break + return os.path.isdir("./resources/emoji/twemoji") + + async def download_emoji( + self, + download_url: str = None, + savepath="./resources/emoji/twemoji.zip", + proxy=None, + ): + # 如果本地已存在,便无需重新下载 + if os.path.isdir("./resources/emoji/twemoji"): + return + _url = ( + self.download_url if download_url is None else download_url + ) # 如果没有提供下载地址则用默认的 + print("Download URL:", _url) + # 从网络上下载 + async with ClientSession(headers={"user-agent": "SSRSpeedN"}) as session: + async with session.get(_url, proxy=proxy, timeout=20) as resp: + if resp.status != 200: + raise Exception(f"NetworkError: {resp.status}==>\t{_url}") + with open(savepath, "wb") as f: + while True: + block = await resp.content.read(1024) + if not block: + break + f.write(block) + + def get_discord_emoji(self, _id: int, /) -> Optional[BytesIO]: + pass + + def get_file_path(self, emoji: str) -> str: + code_points = [f"{ord(c):x}" for c in emoji] + if emoji in {"4️⃣", "6️⃣"}: + del code_points[1] + return f"./resources/emoji/twemoji/assets/72x72/{'-'.join(code_points)}.png" + + __all__ = [ "ApplePediaSource", "GooglePediaSource", @@ -117,9 +238,10 @@ class TossFacePediaSource(EmojiPediaSource): "SkypePediaSource", "JoyPixelsPediaSource", "TossFacePediaSource", + "TwemojiLocalSource", + "OpenmojiLocalSource", ] - if __name__ == "__main__": from PIL import Image, ImageFont @@ -127,6 +249,19 @@ class TossFacePediaSource(EmojiPediaSource): Hello, world! 👋 Here are some flags: 🇧🇦 🇷🇪 🇨🇼 🇺🇲 """ + def check_init(): + if not os.path.isdir("./resources/emoji/twemoji"): + twemoji = TwemojiLocalSource() + print("检测到未安装emoji资源包,正在初始化本地emoji...") + asyncio.get_event_loop().run_until_complete( + twemoji.download_emoji(proxy=None) + ) + if twemoji.init_emoji(twemoji.savepath): + logger.info("初始化emoji成功") + else: + logger.warning("初始化emoji失败") + + # check_init() with Image.new("RGB", (550, 80), (255, 255, 255)) as image: font = ImageFont.truetype("arial.ttf", 24)