executor.py 4.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import logging
  2. from typing import AsyncGenerator, Literal
  3. from aiogram import Bot
  4. from aiogram.exceptions import TelegramBadRequest
  5. from yarl import URL
  6. from podslv21_bot.config import Config
  7. from .models import ModerationDecision, ModerationEvent
  8. from .planner import ModerationPlanner
  9. class ModerationExecutor:
  10. def __init__(self,
  11. config: Config,
  12. bot: Bot,
  13. planner: ModerationPlanner):
  14. self._logger = logging.getLogger(__name__)
  15. self.config = config
  16. self.bot = bot
  17. self.planner = planner
  18. self.planner.set_functions(self.delete_message, self.moderation_decision)
  19. async def delete_message(self, message_link: str):
  20. """
  21. Deletes a message by message_link.
  22. Call this function only when the user explicitly requests to delete their own message.
  23. Do not use it for moderation or automatic cleanup.
  24. """
  25. parsed_url = URL(message_link)
  26. parsed_path = parsed_url.path.strip("/").split("/")
  27. moderation_chat_id = self.config.forwarding.moderation_chat_id
  28. publication_chat_id = self.config.forwarding.publication_chat_id
  29. if len(parsed_path) == 3 and parsed_path[0] == "c"\
  30. and parsed_path[1].replace("-100", "") == str(publication_chat_id).replace("-100", ""):
  31. message_id = parsed_path[2]
  32. try:
  33. await self.bot.delete_message(publication_chat_id, message_id)
  34. await self.bot.send_message(
  35. moderation_chat_id,
  36. f"<b>Исполнитель</b>: Удаление успешно для {message_id}.",
  37. parse_mode="HTML"
  38. )
  39. return ModerationEvent(type="delete_message", result=True)
  40. except TelegramBadRequest:
  41. await self.bot.send_message(
  42. moderation_chat_id,
  43. f"<b>Исполнитель</b>: Удаление окончилось ошибкой для {message_id}.",
  44. parse_mode="HTML"
  45. )
  46. return ModerationEvent(type="delete_message", result=False)
  47. async def moderation_decision(self, status: Literal["PASS", "REJECT"], explanation: str):
  48. """
  49. Processes a message with a moderation decision by status and explanation.
  50. This function must be called whenever there is no exact user request or no other available function
  51. matching the user's intent. Status must be either "PASS" if the message is allowed, or "REJECT" if it should be blocked.
  52. """
  53. moderation_chat_id = self.config.forwarding.moderation_chat_id
  54. human_status = "отправлено" if status == "PASS" else "отклонено"
  55. await self.bot.send_message(
  56. moderation_chat_id,
  57. f"<b>Исполнитель</b>: Сообщение {human_status}.\n\nОбъяснение: {explanation}",
  58. parse_mode="HTML"
  59. )
  60. if status not in ("PASS", "REJECT"):
  61. return ModerationEvent(type="moderation_decision", result=ModerationDecision(status="REJECT", explanation=explanation))
  62. return ModerationEvent(type="moderation_decision", result=ModerationDecision(status=status, explanation=explanation))
  63. async def process_message(self, message_text: str) -> AsyncGenerator[ModerationEvent, None]:
  64. functions = await self.planner.plan(message_text)
  65. function_names = self.planner.get_function_names()
  66. for func in functions:
  67. func_name = func.get("name")
  68. if hasattr(self, func_name) and func_name in function_names:
  69. try:
  70. self._logger.info(f"Executing {func_name}({', '.join(map(str, func.get('args')))})")
  71. yield await getattr(self, func_name)(*func.get("args"))
  72. except Exception:
  73. self._logger.exception(f"Failed to execute {func_name}({', '.join(map(str, func.get('args')))})")