فهرست منبع

Merge pull request #17 from librellium/feature/services

Feature/services
Librellium 1 ماه پیش
والد
کامیت
6fb6d69c1e

+ 20 - 9
anonflow/app.py

@@ -17,6 +17,8 @@ from anonflow.bot import (
 from anonflow.config import Config
 from anonflow.database import (
     Database,
+    BanRepository,
+    ModeratorRepository,
     UserRepository
 )
 from anonflow.moderation import (
@@ -24,6 +26,7 @@ from anonflow.moderation import (
     ModerationPlanner,
     RuleManager
 )
+from anonflow.services import ModeratorService, UserService
 from anonflow.translator import Translator
 
 from . import paths
@@ -45,7 +48,8 @@ class Application:
         self.dispatcher: Optional[Dispatcher] = None
         self.config: Optional[Config] = None
         self.database: Optional[Database] = None
-        self.user_repository: Optional[UserRepository] = None
+        self.moderator_service: Optional[ModeratorService] = None
+        self.user_service: Optional[UserService] = None
         self.translator: Optional[Translator] = None
         self.moderation_executor: Optional[ModerationExecutor] = None
         self.message_sender: Optional[MessageSender] = None
@@ -65,10 +69,14 @@ class Application:
         self.database = Database(config.get_database_url())
         await self.database.init()
 
-        self.user_repository = UserRepository(
+        self.user_service = UserService(
             self.database,
-            config.database.repositories.user.cache_size,
-            config.database.repositories.user.cache_ttl
+            UserRepository()
+        )
+        self.moderator_service = ModeratorService(
+            self.database,
+            BanRepository(),
+            ModeratorRepository()
         )
 
     def _init_logging(self):
@@ -101,11 +109,12 @@ class Application:
         dispatcher = req("dispatcher", self.dispatcher)
         config = req("config", self.config)
         translator = req("translator", self.translator)
-        user_repository = req("user_repository", self.user_repository)
+        moderator_service = req("moderator_service", self.moderator_service)
+        user_service = req("user_service", self.user_service)
 
         dispatcher.update.middleware(
             BlockedMiddleware(
-                user_repository=user_repository,
+                moderator_service=moderator_service,
                 translator=translator,
             )
         )
@@ -120,7 +129,7 @@ class Application:
 
         dispatcher.update.middleware(
             RegisteredMiddleware(
-                user_repository=user_repository,
+                user_service=user_service,
                 translator=translator,
             )
         )
@@ -179,7 +188,8 @@ class Application:
         dispatcher = req("dispatcher", self.dispatcher)
         config = req("config", self.config)
         database = req("database", self.database)
-        user_repository = req("user_repository", self.user_repository)
+        moderator_service = req("moderator_service", self.moderator_service)
+        user_service = req("user_service", self.user_service)
         translator = req("translator", self.translator)
         message_sender = req("message_sender", self.message_sender)
 
@@ -187,7 +197,8 @@ class Application:
             build(
                 config=config,
                 database=database,
-                user_repository=user_repository,
+                moderator_service=moderator_service,
+                user_service=user_service,
                 translator=translator,
                 message_sender=message_sender,
                 moderation_executor=self.moderation_executor,

+ 5 - 3
anonflow/bot/builder.py

@@ -3,8 +3,9 @@ from typing import Optional
 from aiogram import Router
 
 from anonflow.config import Config
-from anonflow.database import Database, UserRepository
+from anonflow.database import Database
 from anonflow.moderation import ModerationExecutor
+from anonflow.services import ModeratorService, UserService
 from anonflow.translator import Translator
 
 from .messaging import MessageSender
@@ -14,7 +15,8 @@ from .routers import InfoRouter, MediaRouter, StartRouter, TextRouter
 def build(
     config: Config,
     database: Database,
-    user_repository: UserRepository,
+    moderator_service: ModeratorService,
+    user_service: UserService,
     translator: Translator,
     message_sender: MessageSender,
     moderation_executor: Optional[ModerationExecutor] = None,
@@ -24,7 +26,7 @@ def build(
     routers = [
         StartRouter(
             translator=translator,
-            user_repository=user_repository
+            user_service=user_service
         ),
         InfoRouter(translator=translator),
         TextRouter(

+ 4 - 6
anonflow/bot/middleware/blocked.py

@@ -1,15 +1,15 @@
 from aiogram import BaseMiddleware
 from aiogram.types import Message
 
-from anonflow.database import UserRepository
+from anonflow.services import ModeratorService
 from anonflow.translator import Translator
 
 
 class BlockedMiddleware(BaseMiddleware):
-    def __init__(self, user_repository: UserRepository, translator: Translator):
+    def __init__(self, moderator_service: ModeratorService, translator: Translator):
         super().__init__()
 
-        self.user_repository = user_repository
+        self.moderator_service = moderator_service
         self.translator = translator
 
     async def __call__(self, handler, event, data):
@@ -17,9 +17,7 @@ class BlockedMiddleware(BaseMiddleware):
 
         message = getattr(event, "message", None)
         if isinstance(message, Message):
-            user = await self.user_repository.get(message.chat.id)
-
-            if user and user.is_blocked:
+            if await self.moderator_service.is_banned(message.chat.id):
                 await message.answer(_("messages.user.blocked", message))
                 return
 

+ 4 - 4
anonflow/bot/middleware/registered.py

@@ -2,15 +2,15 @@ from aiogram import BaseMiddleware
 from aiogram.enums import ChatType
 from aiogram.types import Message
 
-from anonflow.database import UserRepository
+from anonflow.services import UserService
 from anonflow.translator import Translator
 
 
 class RegisteredMiddleware(BaseMiddleware):
-    def __init__(self, user_repository: UserRepository, translator: Translator):
+    def __init__(self, user_service: UserService, translator: Translator):
         super().__init__()
 
-        self.user_repository = user_repository
+        self.user_service = user_service
         self.translator = translator
 
     async def __call__(self, handler, event, data):
@@ -20,7 +20,7 @@ class RegisteredMiddleware(BaseMiddleware):
         if isinstance(message, Message) and message.chat.type == ChatType.PRIVATE:
             text = message.text or message.caption or ""
 
-            is_user_exists = await self.user_repository.has(message.chat.id)
+            is_user_exists = await self.user_service.has(message.chat.id)
             if not is_user_exists and not text.startswith("/start"):
                 await message.answer(_("messages.user.start_required", message))
                 return

+ 1 - 1
anonflow/bot/routers/media.py

@@ -1,6 +1,6 @@
 import asyncio
 from asyncio import CancelledError
-from typing import Dict, List, Optional, Tuple
+from typing import Dict, List, Optional
 
 from aiogram import F, Router
 from aiogram.enums import ChatType

+ 4 - 4
anonflow/bot/routers/start.py

@@ -2,20 +2,20 @@ from aiogram import Router
 from aiogram.filters import CommandStart
 from aiogram.types import Message
 
-from anonflow.database import UserRepository
+from anonflow.services import UserService
 from anonflow.translator import Translator
 
 
 class StartRouter(Router):
-    def __init__(self, translator: Translator, user_repository: UserRepository):
+    def __init__(self, translator: Translator, user_service: UserService):
         super().__init__()
 
         self.translator = translator
-        self.user_repository = user_repository
+        self.user_service = user_service
 
     def setup(self):
         @self.message(CommandStart())
         async def on_start(message: Message):
-            await self.user_repository.add(message.chat.id)
+            await self.user_service.add(message.chat.id)
             _ = self.translator.get()
             await message.answer(_("messages.command.start", message=message))

+ 15 - 3
anonflow/database/__init__.py

@@ -1,5 +1,17 @@
 from .database import Database
-from .orm import User
-from .repositories import UserRepository
+from .orm import Ban, Moderator, User
+from .repositories import (
+    BanRepository,
+    ModeratorRepository,
+    UserRepository
+)
 
-__all__ = ["Database", "User", "UserRepository"]
+__all__ = [
+    "Database",
+    "Ban",
+    "Moderator",
+    "User",
+    "BanRepository",
+    "ModeratorRepository",
+    "UserRepository"
+]

+ 1 - 1
anonflow/database/database.py

@@ -9,7 +9,7 @@ class Database:
     def __init__(self, url: URL, echo: bool = False):
         self.url = url
 
-        self._engine = create_async_engine(self.url, echo=echo, future=True) #type: ignore
+        self._engine = create_async_engine(self.url, echo=echo)
         self._session_maker = sessionmaker(
             self._engine, expire_on_commit=False, class_=AsyncSession # type: ignore
         )

+ 50 - 2
anonflow/database/orm.py

@@ -1,11 +1,59 @@
-from sqlalchemy import Boolean, Column, Integer, String
+from sqlalchemy import Boolean, Column, DateTime, ForeignKey, func, Integer, String
+from sqlalchemy.orm import relationship
 
 from .base import Base
 
 
+class Ban(Base):
+    __tablename__ = "bans"
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    chat_id = Column(
+        Integer,
+        ForeignKey("users.chat_id", ondelete="CASCADE"),
+        index=True,
+        nullable=False
+    )
+    is_active = Column(Boolean, nullable=False, default=True)
+
+    banned_at = Column(
+        DateTime(timezone=True),
+        server_default=func.now(),
+        nullable=False
+    )
+    unbanned_at = Column(DateTime(timezone=True), nullable=True)
+
+    banned_by = Column(Integer, ForeignKey("moderators.chat_id"))
+    unbanned_by = Column(Integer, ForeignKey("moderators.chat_id"))
+
+    user = relationship("User", back_populates="bans")
+
+class Moderator(Base):
+    __tablename__ = "moderators"
+
+    chat_id = Column(Integer, ForeignKey("users.chat_id"), primary_key=True)
+
+    can_approve_posts = Column(Boolean, nullable=False, default=True)
+    can_manage_bans = Column(Boolean, nullable=False, default=False)
+    can_manage_moderators = Column(Boolean, nullable=False, default=False)
+
+    user = relationship("User", back_populates="moderator")
+
 class User(Base):
     __tablename__ = "users"
 
     chat_id = Column(Integer, primary_key=True)
-    is_blocked = Column(Boolean, default=False)
     language = Column(String(2), default="ru")
+
+    moderator = relationship(
+        "Moderator",
+        uselist=False,
+        back_populates="user",
+        cascade="all, delete-orphan"
+    )
+    bans = relationship(
+        "Ban",
+        back_populates="user",
+        cascade="all, delete-orphan",
+        order_by="Ban.banned_at.desc()"
+    )

+ 7 - 1
anonflow/database/repositories/__init__.py

@@ -1,3 +1,9 @@
+from .ban import BanRepository
+from .moderator import ModeratorRepository
 from .user import UserRepository
 
-__all__ = ["UserRepository"]
+__all__ = [
+    "BanRepository",
+    "ModeratorRepository",
+    "UserRepository"
+]

+ 41 - 0
anonflow/database/repositories/ban.py

@@ -0,0 +1,41 @@
+from aiogram.types import ChatIdUnion
+from sqlalchemy import func, select, update
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from anonflow.database.orm import Ban
+
+
+class BanRepository:
+    async def ban(self, session: AsyncSession, actor_chat_id: ChatIdUnion, chat_id: ChatIdUnion):
+        async with session.begin():
+            ban = Ban(
+                chat_id=chat_id,
+                banned_by=actor_chat_id
+            )
+            session.add(ban)
+
+    async def is_banned(self, session: AsyncSession, chat_id: ChatIdUnion):
+        result = await session.execute(
+            select(Ban)
+            .where(
+                Ban.chat_id == chat_id,
+                Ban.is_active.is_(True)
+            )
+            .limit(1)
+        )
+        return bool(result.scalar_one_or_none())
+
+    async def unban(self, session: AsyncSession, actor_chat_id: ChatIdUnion, chat_id: ChatIdUnion):
+        async with session.begin():
+            await session.execute(
+                update(Ban)
+                .where(
+                    Ban.chat_id == chat_id,
+                    Ban.is_active.is_(True)
+                )
+                .values(
+                    is_active=False,
+                    unbanned_at=func.now(),
+                    unbanned_by=actor_chat_id
+                )
+            )

+ 53 - 0
anonflow/database/repositories/base.py

@@ -0,0 +1,53 @@
+from typing import Any, Dict, List, Type
+
+from sqlalchemy import delete, inspect, select, update
+from sqlalchemy.ext.asyncio import AsyncSession
+
+
+class BaseRepository:
+    model: Type[Any]
+
+    def __init__(self):
+        self._column_names = frozenset(
+            c.name for c in inspect(self.model).columns
+        )
+
+    async def _add(self, session: AsyncSession, model_args: Dict[str, Any]):
+        async with session.begin():
+            obj = self.model(**model_args)
+            session.add(obj)
+
+    async def _get(self, session: AsyncSession, filters: Dict[str, Any], options: List[Any] = []):
+        result = await session.execute(
+            select(self.model)
+            .options(*options)
+            .filter_by(**filters)
+        )
+        return result.scalar_one_or_none()
+
+    async def _has(self, session: AsyncSession, filters: Dict[str, Any]):
+        result = await session.execute(
+            select(1)
+            .select_from(self.model)
+            .filter_by(**filters)
+            .limit(1)
+        )
+        return bool(result.scalar_one_or_none())
+
+    async def _remove(self, session: AsyncSession, filters: Dict[str, Any]):
+        async with session.begin():
+            await session.execute(
+                delete(self.model)
+                .filter_by(**filters)
+            )
+
+    async def _update(self, session: AsyncSession, filters: Dict[str, Any], fields: Dict[str, Any]):
+        if not fields:
+            return
+
+        async with session.begin():
+            await session.execute(
+                update(self.model)
+                .filter_by(**filters)
+                .values(**fields)
+            )

+ 98 - 0
anonflow/database/repositories/moderator.py

@@ -0,0 +1,98 @@
+from dataclasses import dataclass
+from typing import Optional
+
+from aiogram.types import ChatIdUnion
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.orm import joinedload
+
+from .base import BaseRepository
+from anonflow.database.orm import Moderator
+
+
+@dataclass
+class ModeratorPermissions:
+    can_approve: bool
+    can_ban: bool
+    can_manage_moderators: bool
+
+class ModeratorRepository(BaseRepository):
+    model = Moderator
+
+    async def add(
+        self,
+        session: AsyncSession,
+        chat_id: ChatIdUnion,
+        *,
+        can_approve_posts: bool = True,
+        can_manage_bans: bool = False,
+        can_manage_moderators: bool = False
+    ):
+        await super()._add(
+            session,
+            model_args={
+                "chat_id": chat_id,
+                "can_approve_posts": can_approve_posts,
+                "can_manage_bans": can_manage_bans,
+                "can_manage_moderators": can_manage_moderators
+            }
+        )
+
+    async def get(self, session: AsyncSession, chat_id: ChatIdUnion) -> Optional[Moderator]:
+        return await super()._get(
+            session,
+            filters={"chat_id": chat_id},
+            options=[
+                joinedload(Moderator.user)
+            ]
+        )
+
+    async def get_permissions(self, session: AsyncSession, chat_id: ChatIdUnion):
+        result = await self.get(session, chat_id)
+        if result:
+            return ModeratorPermissions(
+                result.can_approve_posts.value,
+                result.can_manage_bans.value,
+                result.can_manage_moderators.value
+            )
+
+    async def has(self, session: AsyncSession, chat_id: ChatIdUnion):
+        return await super()._has(
+            session,
+            filters={"chat_id": chat_id}
+        )
+
+    async def remove(self, session: AsyncSession, chat_id: ChatIdUnion):
+        await super()._remove(
+            session,
+            filters={"chat_id": chat_id}
+        )
+
+    async def update(self, session: AsyncSession, chat_id: ChatIdUnion, **fields):
+        await super()._update(
+            session,
+            filters={"chat_id": chat_id}, fields=fields
+        )
+
+    async def update_permissions(
+        self,
+        session: AsyncSession,
+        chat_id: ChatIdUnion,
+        *,
+        can_approve_posts: Optional[bool] = None,
+        can_manage_bans: Optional[bool] = None,
+        can_manage_moderators: Optional[bool] = None
+    ):
+        to_update = {}
+        for key, value in (
+            ("can_approve_posts", can_approve_posts),
+            ("can_manage_bans", can_manage_bans),
+            ("can_manage_moderators", can_manage_moderators),
+        ):
+            if value is not None:
+                to_update[key] = value
+
+        await self.update(
+            session,
+            chat_id,
+            **to_update
+        )

+ 39 - 75
anonflow/database/repositories/user.py

@@ -1,80 +1,44 @@
-import asyncio
-import logging
-
 from aiogram.types import ChatIdUnion
-from cachetools import TTLCache
-from sqlalchemy import exists, update
-from sqlalchemy.exc import IntegrityError
-from sqlalchemy.future import select
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.orm import selectinload, joinedload
 
-from anonflow.database.database import Database
+from .base import BaseRepository
 from anonflow.database.orm import User
 
 
-class UserRepository:
-    def __init__(self, db: Database, cache_size: int, cache_ttl: int):
-        self._logger = logging.getLogger(__name__)
-
-        self._database = db
-        self._cache = TTLCache(maxsize=cache_size, ttl=cache_ttl)
-        self._cache_lock = asyncio.Lock()
-
-    async def add(self, chat_id: ChatIdUnion):
-        async with self._database.get_session() as session:
-            try:
-                user = User(chat_id=chat_id)
-                session.add(user)
-                await session.commit()
-
-                async with self._cache_lock:
-                    self._cache[chat_id] = user
-            except IntegrityError:
-                await session.rollback()
-                self._logger.warning("User chat_id=%s already exists.", chat_id)
-
-    async def block(self, chat_id: ChatIdUnion):
-        await self.update(chat_id, is_blocked=True)
-
-    async def get(self, chat_id: ChatIdUnion):
-        async with self._cache_lock:
-            user = self._cache.get(chat_id)
-            if user: return user
-
-        async with self._database.get_session() as session:
-            result = await session.execute(
-                select(User).where(User.chat_id == chat_id)
-            )
-            return result.scalar_one_or_none()
-
-    async def has(self, chat_id: ChatIdUnion):
-        async with self._cache_lock:
-            if self._cache.get(chat_id):
-                return True
-
-        async with self._database.get_session() as session:
-            result = await session.execute(
-                select(exists().where(User.chat_id == chat_id))
-            )
-            return result.scalar()
-
-    async def unblock(self, chat_id: ChatIdUnion):
-        await self.update(chat_id, is_blocked=False)
-
-    async def update(self, chat_id: ChatIdUnion, **fields):
-        async with self._database.get_session() as session:
-            try:
-                await session.execute(
-                    update(User)
-                    .where(User.chat_id == chat_id)
-                    .values(**fields)
-                    .execution_options(synchronize_session="fetch")
-                )
-                await session.commit()
-
-                user = await session.get(User, chat_id)
-                async with self._cache_lock:
-                    if user:
-                        self._cache[chat_id] = user
-            except IntegrityError:
-                await session.rollback()
-                self._logger.warning("Failed to update user chat_id=%s", chat_id)
+class UserRepository(BaseRepository):
+    model = User
+
+    async def add(self, session: AsyncSession, chat_id: ChatIdUnion):
+        await super()._add(
+            session,
+            model_args={"chat_id": chat_id}
+        )
+
+    async def get(self, session: AsyncSession, chat_id: ChatIdUnion):
+        return await super()._get(
+            session,
+            filters={"chat_id": chat_id},
+            options=[
+                selectinload(User.bans),
+                joinedload(User.moderator)
+            ]
+        )
+
+    async def has(self, session: AsyncSession, chat_id: ChatIdUnion):
+        return await super()._has(
+            session,
+            filters={"chat_id": chat_id}
+        )
+
+    async def remove(self, session: AsyncSession, chat_id: ChatIdUnion):
+        await super()._remove(
+            session,
+            filters={"chat_id": chat_id}
+        )
+
+    async def update(self, session: AsyncSession, chat_id: ChatIdUnion, **fields):
+        await super()._update(
+            session,
+            filters={"chat_id": chat_id}, fields=fields
+        )

+ 4 - 0
anonflow/services/__init__.py

@@ -0,0 +1,4 @@
+from .moderator import ForbiddenError, ModeratorService
+from .user import UserService
+
+__all__ = ["ForbiddenError", "ModeratorService", "UserService"]

+ 137 - 0
anonflow/services/moderator.py

@@ -0,0 +1,137 @@
+import logging
+from typing import Optional
+
+from aiogram.types import ChatIdUnion
+from sqlalchemy.exc import IntegrityError
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from anonflow.database import (
+    Database,
+    BanRepository,
+    ModeratorRepository
+)
+
+
+class ForbiddenError(PermissionError): ...
+
+class ModeratorService:
+    def __init__(
+        self,
+        database: Database,
+        ban_repository: BanRepository,
+        moderator_repository: ModeratorRepository
+    ):
+        self._logger = logging.getLogger(__name__)
+
+        self._database = database
+        self._ban_repository = ban_repository
+        self._moderator_repository = moderator_repository
+
+    async def add(self, actor_chat_id: ChatIdUnion, chat_id: ChatIdUnion):
+        try:
+            async with self._database.get_session() as session:
+                if await self._can_manage_moderators(session, actor_chat_id):
+                    await self._moderator_repository.add(session, chat_id)
+                else:
+                    raise ForbiddenError()
+        except IntegrityError:
+            self._logger.warning("Failed to add moderator chat_id=%s", chat_id)
+
+    async def ban(self, actor_chat_id: ChatIdUnion, chat_id: ChatIdUnion):
+        async with self._database.get_session() as session:
+            if await self._can_manage_bans(session, actor_chat_id):
+                await self._ban_repository.ban(session, actor_chat_id, chat_id)
+            else:
+                raise ForbiddenError()
+
+    async def _get_permission(self, session: AsyncSession, actor_chat_id: ChatIdUnion, name: str) -> bool:
+        moderator = await self._moderator_repository.get(session, actor_chat_id)
+        return getattr(getattr(moderator, name, None), "value", False)
+
+    async def _can_approve_posts(self, session: AsyncSession, actor_chat_id: ChatIdUnion):
+        return await self._get_permission(session, actor_chat_id, "can_approve_posts")
+
+    async def can_approve_posts(self, actor_chat_id: ChatIdUnion):
+        async with self._database.get_session() as session:
+            return await self._can_approve_posts(session, actor_chat_id)
+
+    async def _can_manage_bans(self, session: AsyncSession, actor_chat_id: ChatIdUnion):
+        return await self._get_permission(session, actor_chat_id, "can_manage_bans")
+
+    async def can_manage_bans(self, actor_chat_id: ChatIdUnion):
+        async with self._database.get_session() as session:
+            return await self._can_manage_bans(session, actor_chat_id)
+
+    async def _can_manage_moderators(self, session: AsyncSession, actor_chat_id: ChatIdUnion):
+        return await self._get_permission(session, actor_chat_id, "can_manage_moderators")
+
+    async def can_manage_moderators(self, actor_chat_id: ChatIdUnion):
+        async with self._database.get_session() as session:
+            return await self._can_manage_moderators(session, actor_chat_id)
+
+    async def get(self, chat_id: ChatIdUnion):
+        async with self._database.get_session() as session:
+            return await self._moderator_repository.get(session, chat_id)
+
+    async def get_permissions(self, chat_id: ChatIdUnion):
+        async with self._database.get_session() as session:
+            return await self._moderator_repository.get_permissions(session, chat_id)
+
+    async def has(self, chat_id: ChatIdUnion):
+        async with self._database.get_session() as session:
+            return await self._moderator_repository.has(session, chat_id)
+
+    async def is_banned(self, chat_id: ChatIdUnion):
+        async with self._database.get_session() as session:
+            return await self._ban_repository.is_banned(session, chat_id)
+
+    async def remove(self, actor_chat_id: ChatIdUnion, chat_id: ChatIdUnion):
+        try:
+            async with self._database.get_session() as session:
+                if await self._can_manage_moderators(session, actor_chat_id):
+                    await self._moderator_repository.remove(session, chat_id)
+                else:
+                    raise ForbiddenError()
+        except IntegrityError:
+            self._logger.warning("Failed to remove moderator chat_id=%s", chat_id)
+
+    async def unban(self, actor_chat_id: ChatIdUnion, chat_id: ChatIdUnion):
+        async with self._database.get_session() as session:
+            if await self._can_manage_bans(session, actor_chat_id):
+                await self._ban_repository.unban(session, actor_chat_id, chat_id)
+            else:
+                raise ForbiddenError()
+
+    async def update(self, actor_chat_id: ChatIdUnion, chat_id: ChatIdUnion, **fields):
+        try:
+            async with self._database.get_session() as session:
+                if await self._can_manage_moderators(session, actor_chat_id):
+                    await self._moderator_repository.update(session, chat_id, **fields)
+                else:
+                    raise ForbiddenError()
+        except IntegrityError:
+            self._logger.warning("Failed to update moderator chat_id=%s", chat_id)
+
+    async def update_permissions(
+        self,
+        actor_chat_id: ChatIdUnion,
+        chat_id: ChatIdUnion,
+        *,
+        can_approve_posts: Optional[bool] = None,
+        can_manage_bans: Optional[bool] = None,
+        can_manage_moderators: Optional[bool] = None
+    ):
+        try:
+            async with self._database.get_session() as session:
+                if await self._can_manage_moderators(session, actor_chat_id):
+                    await self._moderator_repository.update_permissions(
+                        session,
+                        chat_id,
+                        can_approve_posts=can_approve_posts,
+                        can_manage_bans=can_manage_bans,
+                        can_manage_moderators=can_manage_moderators
+                    )
+                else:
+                    raise ForbiddenError()
+        except IntegrityError:
+            self._logger.warning("Failed to update moderator chat_id=%s", chat_id)

+ 43 - 0
anonflow/services/user.py

@@ -0,0 +1,43 @@
+import logging
+
+from aiogram.types import ChatIdUnion
+from sqlalchemy.exc import IntegrityError
+
+from anonflow.database import Database, UserRepository
+
+
+class UserService:
+    def __init__(self, database: Database, user_repository: UserRepository):
+        self._logger = logging.getLogger(__name__)
+
+        self._database = database
+        self._user_repository = user_repository
+
+    async def add(self, chat_id: ChatIdUnion):
+        try:
+            async with self._database.get_session() as session:
+                await self._user_repository.add(session, chat_id)
+        except IntegrityError:
+            self._logger.warning("Failed to add user chat_id=%s", chat_id)
+
+    async def get(self, chat_id: ChatIdUnion):
+        async with self._database.get_session() as session:
+            return await self._user_repository.get(session, chat_id)
+
+    async def has(self, chat_id: ChatIdUnion):
+        async with self._database.get_session() as session:
+            return await self._user_repository.has(session, chat_id)
+
+    async def remove(self, chat_id: ChatIdUnion):
+        try:
+            async with self._database.get_session() as session:
+                await self._user_repository.remove(session, chat_id)
+        except IntegrityError:
+            self._logger.warning("Failed to remove user chat_id=%s", chat_id)
+
+    async def update(self, chat_id: ChatIdUnion, **fields):
+        try:
+            async with self._database.get_session() as session:
+                await self._user_repository.update(session, chat_id, **fields)
+        except IntegrityError:
+            self._logger.warning("Failed to update user chat_id=%s", chat_id)