|
|
@@ -10,35 +10,32 @@ from aiogram.enums import ChatType
|
|
|
from aiogram.types import Message
|
|
|
|
|
|
from anonflow.config.models import ForwardingType
|
|
|
-from anonflow.moderation import ModerationExecutor
|
|
|
-from anonflow.services.transport import MessageRouter
|
|
|
+from anonflow.interfaces import PostResponsesPort
|
|
|
+from anonflow.moderation import ModerationService
|
|
|
from anonflow.services.transport.content import (
|
|
|
- ContentMediaGroup,
|
|
|
+ ContentGroup,
|
|
|
ContentMediaItem,
|
|
|
MediaType
|
|
|
)
|
|
|
-from anonflow.services.transport.results import (
|
|
|
- ModerationDecisionResult,
|
|
|
- PostPreparedResult
|
|
|
-)
|
|
|
+from anonflow.services.transport.types import RequestContext
|
|
|
|
|
|
|
|
|
class MediaRouter(Router):
|
|
|
def __init__(
|
|
|
self,
|
|
|
- message_router: MessageRouter,
|
|
|
- forwarding_types: FrozenSet[ForwardingType],
|
|
|
- moderation_executor: ModerationExecutor,
|
|
|
+ responses_port: PostResponsesPort,
|
|
|
+ moderation_service: ModerationService,
|
|
|
+ forwarding_types: FrozenSet[ForwardingType]
|
|
|
):
|
|
|
super().__init__()
|
|
|
|
|
|
- self.message_router = message_router
|
|
|
- self.forwarding_types = forwarding_types
|
|
|
- self.moderation_executor = moderation_executor
|
|
|
+ self._responses_port = responses_port
|
|
|
+ self._moderation_service = moderation_service
|
|
|
+ self._forwarding_types = forwarding_types
|
|
|
|
|
|
- self.media_groups: Dict[str, List[Message]] = {}
|
|
|
- self.media_groups_tasks: Dict[str, asyncio.Task] = {}
|
|
|
- self.media_groups_lock = asyncio.Lock()
|
|
|
+ self._media_groups: Dict[str, List[Message]] = {}
|
|
|
+ self._media_groups_tasks: Dict[str, asyncio.Task] = {}
|
|
|
+ self._media_groups_lock = asyncio.Lock()
|
|
|
|
|
|
@staticmethod
|
|
|
async def get_b64image(message: Message):
|
|
|
@@ -53,72 +50,74 @@ class MediaRouter(Router):
|
|
|
|
|
|
def _can_send_media(self, msgs: List[Message]):
|
|
|
return any(
|
|
|
- (msg.photo and "photo" in self.forwarding_types) or
|
|
|
- (msg.video and "video" in self.forwarding_types)
|
|
|
+ (msg.photo and "photo" in self._forwarding_types) or
|
|
|
+ (msg.video and "video" in self._forwarding_types)
|
|
|
for msg in msgs
|
|
|
)
|
|
|
|
|
|
def _get_media(self, message: Message):
|
|
|
- if message.photo and "photo" in self.forwarding_types:
|
|
|
+ if message.photo and "photo" in self._forwarding_types:
|
|
|
return {"type": MediaType.PHOTO, "file_id": message.photo[-1].file_id}
|
|
|
- elif message.video and "video" in self.forwarding_types:
|
|
|
+ elif message.video and "video" in self._forwarding_types:
|
|
|
return {"type": MediaType.VIDEO, "file_id": message.video.file_id}
|
|
|
|
|
|
- def setup(self):
|
|
|
- async def process_messages(messages: List[Message]):
|
|
|
- if not messages:
|
|
|
- return
|
|
|
-
|
|
|
- if self._can_send_media(messages):
|
|
|
- moderation_approved = False
|
|
|
-
|
|
|
- content_group = ContentMediaGroup()
|
|
|
- caption = next((msg.caption for msg in messages if msg.caption), "")
|
|
|
- for message in messages:
|
|
|
- async for result in self.moderation_executor.process(
|
|
|
- message.caption,
|
|
|
- await self.get_b64image(message)
|
|
|
- ):
|
|
|
- if isinstance(result, ModerationDecisionResult):
|
|
|
- moderation_approved = result.is_approved
|
|
|
- await self.message_router.dispatch(result, message)
|
|
|
-
|
|
|
- media = self._get_media(message)
|
|
|
- if media:
|
|
|
- content_group.items.append(ContentMediaItem(**media, caption=caption))
|
|
|
-
|
|
|
- await self.message_router.dispatch(
|
|
|
- PostPreparedResult(content_group, moderation_approved),
|
|
|
- messages[0]
|
|
|
- )
|
|
|
+ async def _process_messages(self, messages: List[Message], user_language: str):
|
|
|
+ if not messages:
|
|
|
+ return
|
|
|
+
|
|
|
+ if not self._can_send_media(messages):
|
|
|
+ return
|
|
|
|
|
|
- @self.message(F.photo | F.video)
|
|
|
- async def on_photo(message: Message):
|
|
|
- if message.chat.type != ChatType.PRIVATE:
|
|
|
- return
|
|
|
+ context = RequestContext(messages[0].chat.id, user_language)
|
|
|
+ is_approved = False
|
|
|
|
|
|
- media_group_id = message.media_group_id
|
|
|
+ content_group = ContentGroup()
|
|
|
+ caption = next((msg.caption for msg in messages if msg.caption), "")
|
|
|
|
|
|
- async def await_media_group():
|
|
|
- with suppress(CancelledError):
|
|
|
- await asyncio.sleep(2)
|
|
|
- async with self.media_groups_lock:
|
|
|
- messages = self.media_groups.pop(media_group_id, []) # type: ignore
|
|
|
- self.media_groups_tasks.pop(media_group_id, None) # type: ignore
|
|
|
+ for message in messages:
|
|
|
+ is_approved = await self._moderation_service.process(
|
|
|
+ context,
|
|
|
+ message.caption,
|
|
|
+ await self.get_b64image(message)
|
|
|
+ )
|
|
|
+
|
|
|
+ media = self._get_media(message)
|
|
|
+ if media:
|
|
|
+ content_group.append(ContentMediaItem(**media, caption=caption))
|
|
|
+
|
|
|
+ await self._responses_port.post_prepared(
|
|
|
+ context, content_group, is_approved
|
|
|
+ )
|
|
|
|
|
|
- await process_messages(messages)
|
|
|
+ async def _on_media(self, message: Message, user_language: str):
|
|
|
+ if message.chat.type != ChatType.PRIVATE:
|
|
|
+ return
|
|
|
|
|
|
- if media_group_id:
|
|
|
- async with self.media_groups_lock:
|
|
|
- self.media_groups.setdefault(media_group_id, []).append(message)
|
|
|
+ media_group_id = message.media_group_id
|
|
|
|
|
|
- task = self.media_groups_tasks.get(media_group_id)
|
|
|
- if task:
|
|
|
- task.cancel()
|
|
|
+ async def await_media_group():
|
|
|
+ with suppress(CancelledError):
|
|
|
+ await asyncio.sleep(2)
|
|
|
+ async with self._media_groups_lock:
|
|
|
+ messages = self.media_groups.pop(media_group_id, []) # type: ignore
|
|
|
+ self.media_groups_tasks.pop(media_group_id, None) # type: ignore
|
|
|
|
|
|
- self.media_groups_tasks[media_group_id] = asyncio.create_task(
|
|
|
- await_media_group()
|
|
|
- )
|
|
|
- return
|
|
|
+ await self._process_messages(messages, user_language)
|
|
|
|
|
|
- await process_messages([message])
|
|
|
+ if media_group_id:
|
|
|
+ async with self._media_groups_lock:
|
|
|
+ self._media_groups.setdefault(media_group_id, []).append(message)
|
|
|
+
|
|
|
+ task = self._media_groups_tasks.get(media_group_id)
|
|
|
+ if task:
|
|
|
+ task.cancel()
|
|
|
+
|
|
|
+ self._media_groups_tasks[media_group_id] = asyncio.create_task(
|
|
|
+ await_media_group()
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ await self._process_messages([message], user_language)
|
|
|
+
|
|
|
+ def setup(self):
|
|
|
+ self.message.register(self._on_media, F.photo | F.video)
|