Răsfoiți Sursa

Merge pull request #29 from librellium/feature/cli

Feature/cli
Librellium 12 ore în urmă
părinte
comite
671b578349

+ 117 - 0
anonflow/cli/moderator_manager.py

@@ -0,0 +1,117 @@
+import asyncio
+import inspect
+from ast import literal_eval
+from contextlib import suppress
+from typing import Any, Dict, List
+
+from prompt_toolkit import PromptSession
+from prompt_toolkit.completion import WordCompleter
+
+from anonflow import paths
+from anonflow.config import Config
+from anonflow.constants import SYSTEM_USER_ID
+from anonflow.database import (
+    BanRepository,
+    Database,
+    ModeratorRepository,
+)
+from anonflow.services import ModeratorService
+
+
+class ModeratorManager:
+    def __init__(self):
+        self._allowed_commands = (
+            "add", "can", "get",
+            "has", "remove", "update",
+        )
+
+        self._completer = WordCompleter(self._allowed_commands, ignore_case=True)
+        self._session = PromptSession(completer=self._completer)
+
+        self._config = Config.load(paths.CONFIG_FILEPATH)
+        self._database = Database(self._config.get_database_url())
+        self._service = ModeratorService(
+            self._database, BanRepository(), ModeratorRepository()
+        )
+
+    @staticmethod
+    async def _execute(method, *args, **kwargs):
+        try:
+            sig = inspect.signature(method)
+            if "actor_user_id" in sig.parameters:
+                result = await method(SYSTEM_USER_ID, *args, **kwargs)
+            else:
+                result = await method(*args, **kwargs)
+            if result is not None:
+                return result
+        except Exception as e:
+            return str(e)
+
+    @staticmethod
+    def _parse_text(text: str):
+        text = text.strip()
+        if not text:
+            return
+
+        command, *raw_args = text.split()
+
+        args: List[Any] = []
+        kwargs: Dict[str, Any] = {}
+
+        for arg in raw_args:
+            if "=" in arg and all(s := arg.split("=", maxsplit=1)):
+                key, value = s[0].strip(), s[1].strip()
+                if key != "actor_user_id":
+                    with suppress(ValueError):
+                        value = literal_eval(value)
+                    kwargs[key] = value
+            else:
+                with suppress(ValueError):
+                    arg = literal_eval(arg)
+                args.append(arg)
+
+        return command, args, kwargs
+
+    async def close(self):
+        await self._database.close()
+
+    async def init(self):
+        await self._database.init()
+
+    async def run(self):
+        while True:
+            try:
+                text: str = await self._session.prompt_async("[ModeratorManager]> ")
+            except KeyboardInterrupt:
+                print("Use CTRL+D or type 'exit' to exit")
+                continue
+            except EOFError:
+                break
+            else:
+                if parsed := self._parse_text(text):
+                    command, args, kwargs = parsed
+
+                    if command in ["exit", "quit", "q"]:
+                        break
+
+                    if command in self._allowed_commands and (
+                        method := getattr(self._service, command, None)
+                    ):
+                        result = await self._execute(method, *args, **kwargs)
+                        if result is not None:
+                            print(result)
+                    else:
+                        print(f"Unknown command: {command}")
+
+
+async def main():
+    manager = ModeratorManager()
+    try:
+        await manager.init()
+        await manager.run()
+    finally:
+        await manager.close()
+
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 17 - 0
anonflow/database/orm.py

@@ -45,6 +45,23 @@ class Moderator(Base):
 
     user = relationship("User", back_populates="moderator")
 
+    def __str__(self):
+        permissions = []
+        if bool(self.is_root):
+            permissions.append("root")
+        else:
+            for permission in dir(self):
+                if (
+                    permission.startswith("can_")
+                    and getattr(self, permission, False)
+                ):
+                    permissions.append(permission)
+
+        return (
+            f"Moderator user_id={self.user_id} "
+            f"permissions={','.join(permissions)}"
+        )
+
 
 class User(Base):
     __tablename__ = "users"

+ 1 - 1
anonflow/services/moderator/service.py

@@ -25,7 +25,7 @@ class ModeratorService:
 
     @staticmethod
     def _assert_not_self(actor_user_id: int, user_id: int):
-        if actor_user_id == user_id:
+        if int(actor_user_id) == int(user_id):
             raise SelfActionError(
                 f"Moderator user_id={actor_user_id} cannot perform this action on themselves, target user_id={user_id}"
             )

+ 2 - 0
requirements.txt

@@ -24,6 +24,7 @@ Mako==1.3.10
 MarkupSafe==3.0.3
 multidict==6.7.0
 openai==2.8.1
+prompt_toolkit==3.0.52
 propcache==0.4.1
 pydantic==2.11.10
 pydantic_core==2.33.2
@@ -34,4 +35,5 @@ SQLAlchemy==2.0.45
 tqdm==4.67.1
 typing-inspection==0.4.2
 typing_extensions==4.15.0
+wcwidth==0.6.0
 yarl==1.22.0