Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: 简单的适配下消息段 & 优化下发语音的代码 #109

Merged
merged 1 commit into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/common/utils/array2cqcode/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import json

from .message_segment import BaseMessageSegment
from typing import Any


def try_convert_to_cqcode(data: Any) -> str | Any:
try:
msg = json.loads(data)
if not isinstance(msg, list):
return msg
except TypeError:
if not isinstance(data, list):
return data
msg = data
except json.JSONDecodeError:
return data
except:
return data
cqmessage = ''
for seg in msg:
cqmessage += BaseMessageSegment(**seg).cqcode
return cqmessage
23 changes: 23 additions & 0 deletions src/common/utils/array2cqcode/message_segment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
class BaseMessageSegment:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)

@property
def cqcode(self):
if self.type == "text":
return self.data.get("text")
message = f"[CQ:{self.type}"
for k, v in self.__dict__.get("data").items():
message += f",{k}={self.escape(v)}"
message += "]"
return message

@staticmethod
def escape(data: str) -> str:
return (
data.replace("&", "&")
.replace("[", "[")
.replace("]", "]")
.replace(",", ",")
)
13 changes: 7 additions & 6 deletions src/plugins/greeting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ async def handle_first_receive(bot: Bot, event: GroupMessageEvent, state: T_Stat
return
config.refresh_cooldown('call_me')

with open(Path(wiki.get_random_voice(operator, greeting_voices)), 'rb') as f:
data = f.read()
msg: Message = MessageSegment.record(file=data)
msg: Message = MessageSegment.record(
file=Path(wiki.get_random_voice(operator, greeting_voices)).read_bytes())
await call_me_cmd.finish(msg)


to_me_cmd = on_message(
rule=to_me(),
priority=14,
Expand All @@ -87,9 +87,10 @@ async def handle_first_receive(bot: Bot, event: GroupMessageEvent, state: T_Stat

if len(event.get_plaintext().strip()) == 0 and not event.reply:
msg: Message = MessageSegment.record(
file=Path(wiki.get_random_voice(operator, greeting_voices)))
file=Path(wiki.get_random_voice(operator, greeting_voices)).read_bytes())
await to_me_cmd.finish(msg)


all_notice = on_notice(
priority=14,
block=False)
Expand Down Expand Up @@ -122,12 +123,12 @@ async def handle_first_receive(bot: Bot, event: Event, state: T_State):

elif event.notice_type == 'group_admin' and event.sub_type == 'set' and event.user_id == event.self_id:
msg: Message = MessageSegment.record(
file=Path(wiki.get_voice_filename(operator, '任命助理')))
file=Path(wiki.get_voice_filename(operator, '任命助理')).read_bytes())
await all_notice.finish(msg)

elif event.notice_type == 'friend_add':
msg: Message = MessageSegment.record(
file=Path(wiki.get_voice_filename(operator, '精英化晋升2')))
file=Path(wiki.get_voice_filename(operator, '精英化晋升2')).read_bytes())
await all_notice.finish(msg)

# 被禁言自动退群
Expand Down
5 changes: 3 additions & 2 deletions src/plugins/repeater/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from nonebot.permission import SUPERUSER
from src.common.config import BotConfig
from src.common.utils.media_cache import insert_image, get_image
from src.common.utils.array2cqcode import try_convert_to_cqcode

from .model import Chat

Expand Down Expand Up @@ -155,7 +156,7 @@ async def is_reply(bot: Bot, event: GroupMessageEvent, state: T_State) -> bool:

@ban_msg.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
if '[CQ:reply,' not in event.raw_message:
if '[CQ:reply,' not in try_convert_to_cqcode(event.raw_message):
return False

raw_message = ''
Expand Down Expand Up @@ -219,7 +220,7 @@ async def _(bot: Bot, event: GroupRecallNoticeEvent, state: T_State):

raw_message = ''
# 使用get_msg得到的消息不是消息序列,使用正则生成一个迭代对象
for item in re.compile(r'\[[^\]]*\]|\w+').findall(msg['message']):
for item in re.compile(r'\[[^\]]*\]|\w+').findall(try_convert_to_cqcode(msg['message'])):
raw_reply = str(item)
# 去掉图片消息中的 url, subType 等字段
raw_message += re.sub(r'(\[CQ\:.+)(?:,url=*)(\])',
Expand Down
Loading