service.py 1.1 KB

12345678910111213141516171819202122232425262728
  1. from typing import Optional
  2. from anonflow.interfaces import PostResponsesPort
  3. from anonflow.bot.transport.types import RequestContext
  4. from .events import ModerationDecisionEvent, ModerationStartedEvent
  5. from .executor import ModerationExecutor
  6. class ModerationService:
  7. def __init__(
  8. self,
  9. responses_port: PostResponsesPort,
  10. moderation_executor: ModerationExecutor
  11. ):
  12. self._responses_port = responses_port
  13. self._moderation_executor = moderation_executor
  14. async def process(self, context: RequestContext, text: Optional[str] = None, image: Optional[str] = None):
  15. is_approved = False
  16. async for event in self._moderation_executor.process(text, image):
  17. if isinstance(event, ModerationDecisionEvent):
  18. is_approved = event.is_approved
  19. await self._responses_port.post_moderation_decision(context, event.is_approved, event.reason)
  20. elif isinstance(event, ModerationStartedEvent):
  21. await self._responses_port.post_moderation_started(context)
  22. return is_approved