diff --git a/.vscode/settings.json b/.vscode/settings.json index 6d4bbfd15..7099dd262 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -11,7 +11,7 @@ "source.organizeImports.ruff": "explicit" } }, - "python.languageServer": "Pylance", + "python.languageServer": "None", "python.analysis.typeCheckingMode": "strict", "python.analysis.autoFormatStrings": true, "python.analysis.completeFunctionParens": true, diff --git a/tux/cogs/moderation/__init__.py b/tux/cogs/moderation/__init__.py index 1f0c8be96..0bc0d66b7 100644 --- a/tux/cogs/moderation/__init__.py +++ b/tux/cogs/moderation/__init__.py @@ -604,3 +604,184 @@ async def is_jailed(self, guild_id: int, user_id: int) -> bool: active_restriction_type=CaseType.JAIL, inactive_restriction_type=CaseType.UNJAIL, ) + + # ------------------------------------------------------------------ + # Unified mixed-argument executor (dynamic moderation system) + # ------------------------------------------------------------------ + async def execute_mixed_mod_action( + self, + ctx: commands.Context[Tux], + config: "ModerationCommandConfig", # quoted to avoid circular import at runtime + user: discord.Member | discord.User, + mixed_args: str, + ) -> None: + """Parse *mixed_args* according to *config* and execute the moderation flow. + + This serves as the single entry-point for all dynamically generated + moderation commands. It handles: + 1. Mixed-argument parsing (positional + flags). + 2. Validation based on *config* (duration required?, purge range?, etc.). + 3. Permission / sanity checks via *check_conditions*. + 4. Building the *actions* list and delegating to :py:meth:`execute_mod_action`. + """ + + from tux.utils.mixed_args import parse_mixed_arguments # local import to avoid heavy top-level deps + from tux.utils.constants import CONST # default reason constant + + assert ctx.guild, "This command can only be used in guild context." # noqa: S101 + + parsed = parse_mixed_arguments(mixed_args or "") + ok, validated = self._validate_args(config, parsed) + if not ok: + await ctx.send(validated["error"], ephemeral=True) # type: ignore[index] + return + + duration = validated["duration"] # type: ignore[assignment] + purge = validated["purge"] # type: ignore[assignment] + reason = (validated.get("reason") or CONST.DEFAULT_REASON) # type: ignore[assignment] + silent = validated["silent"] # type: ignore[assignment] + + # ------------------------------------------------------------------ + # Permission / sanity checks + # ------------------------------------------------------------------ + if not await self.check_conditions(ctx, user, ctx.author, config.name): + return + + # ------------------------------------------------------------------ + # Build Discord action coroutine(s) + # ------------------------------------------------------------------ + # Prepare arg bundle for the lambda / callable + arg_bundle: dict[str, Any] = { + "duration": duration, + "purge": purge, + "silent": silent, + } + + coroutine_or_none = config.discord_action(ctx.guild, user, reason, arg_bundle) + actions: list[tuple[Any, type[Any]]] = [] + if coroutine_or_none is not None: + actions.append((coroutine_or_none, type(None))) + + # ------------------------------------------------------------------ + # Delegate to existing helper that handles DM, case creation, etc. + # ------------------------------------------------------------------ + await self.execute_mod_action( + ctx=ctx, + case_type=config.case_type, + user=user, + reason=reason, + silent=silent, + dm_action=config.dm_action, + actions=actions, + duration=duration, + ) + + # ------------------------------------------------------------------ + # Validation helper for dynamic moderation commands + # ------------------------------------------------------------------ + def _validate_args(self, config: "ModerationCommandConfig", parsed: dict[str, Any]) -> tuple[bool, dict[str, Any]]: + """Validate *parsed* arguments against *config* rules. + + Returns (is_valid, validated_dict). On failure sends the error message + via the ctx stored in validated_dict["ctx"] and returns False. + """ + validated: dict[str, Any] = {} + + # Duration + duration = parsed.get("duration") + if config.supports_duration: + if not duration: + return False, {"error": "Duration required (e.g. `14d`)."} + from tux.utils.functions import parse_time_string + try: + parse_time_string(duration) # ensure valid + except Exception: + return False, {"error": "Invalid duration format."} + validated["duration"] = duration + else: + validated["duration"] = None + + # Purge + purge_raw = parsed.get("purge") + if config.supports_purge: + try: + purge_val = int(purge_raw or 0) + except ValueError: + return False, {"error": "Purge must be an integer 0-7."} + if not 0 <= purge_val <= 7: + return False, {"error": "Purge must be between 0 and 7."} + validated["purge"] = purge_val + else: + validated["purge"] = 0 + + validated["reason"] = parsed.get("reason") + validated["silent"] = bool(parsed.get("silent", False)) + return True, validated + + # ------------------------------------------------------------------ + # New flag-based dynamic executor (replacing mixed-args approach) + # ------------------------------------------------------------------ + async def execute_flag_mod_action( + self, + ctx: commands.Context[Tux], + config: "ModerationCommandConfig", + user: discord.Member | discord.User, + flags: Any, + reason: str, + ) -> None: + """Execute moderation flow based on *flags* parsed by FlagConverter. + + This is the preferred pathway for dynamically generated moderation + commands that rely on discord.py's native FlagConverter parsing. + """ + + from tux.utils.constants import CONST + + assert ctx.guild, "Command must run in a guild context." # noqa: S101 + + duration = getattr(flags, "duration", None) if flags is not None else None + purge = getattr(flags, "purge", 0) if flags is not None else 0 + silent = getattr(flags, "silent", False) if flags is not None else False + + # Validation based on config + if config.supports_duration and not duration: + await ctx.send("Duration required (e.g. 14d).", ephemeral=True) + return + if not config.supports_duration: + duration = None + + if not config.supports_purge: + purge = 0 + else: + if not isinstance(purge, int) or not 0 <= purge <= 7: + await ctx.send("Purge must be between 0 and 7.", ephemeral=True) + return + + reason_final = reason or CONST.DEFAULT_REASON + + # Permission / sanity checks + if not await self.check_conditions(ctx, user, ctx.author, config.name): + return + + # Build arg bundle for discord_action + arg_bundle: dict[str, Any] = { + "duration": duration, + "purge": purge, + "silent": silent, + } + + coroutine_or_none = config.discord_action(ctx.guild, user, reason_final, arg_bundle) + actions: list[tuple[Any, type[Any]]] = [] + if coroutine_or_none is not None: + actions.append((coroutine_or_none, type(None))) + + await self.execute_mod_action( + ctx=ctx, + case_type=config.case_type, + user=user, + reason=reason_final, + silent=silent, + dm_action=config.dm_action, + actions=actions, + duration=duration, + ) diff --git a/tux/cogs/moderation/action_mixin.py b/tux/cogs/moderation/action_mixin.py new file mode 100644 index 000000000..f1bceb113 --- /dev/null +++ b/tux/cogs/moderation/action_mixin.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +from collections.abc import Awaitable, Callable +from typing import Any + +import discord +from discord.ext import commands +from prisma.enums import CaseType + +from tux.cogs.moderation import ModerationCogBase +from tux.utils.constants import CONST +from tux.utils.transformers import MemberOrUser + +__all__ = ["ModerationActionMixin"] + + +class ModerationActionMixin(ModerationCogBase): + """Shared execution helper for moderation actions. + + This mixin builds the *actions* list and delegates to + :py:meth:`ModerationCogBase.execute_mod_action` so concrete commands only + need to supply metadata and the Discord API coroutine. + """ + + async def _run_action( # noqa: WPS211 (many params is ok for clarity) + self, + ctx: commands.Context, # noqa: D401 + *, + member: MemberOrUser, + reason: str, + duration: str | None, + purge: int, + silent: bool, + case_type: CaseType, + dm_verb: str, + discord_action: Callable[[], Awaitable[Any]] | None = None, + ) -> None: + """Validate and execute the moderation workflow.""" + + assert ctx.guild is not None, "Guild-only command" # noqa: S101 + + # Permission / sanity checks + if not await self.check_conditions(ctx, member, ctx.author, case_type.name.lower()): + return + + if purge and not (0 <= purge <= 7): + await ctx.send("Purge must be between 0 and 7 days.", ephemeral=True) + return + + final_reason = reason or CONST.DEFAULT_REASON + + actions: list[tuple[Awaitable[Any], type[Any]]] = [] + if discord_action is not None: + actions.append((discord_action(), type(None))) + + await self.execute_mod_action( + ctx=ctx, + case_type=case_type, + user=member, + reason=final_reason, + silent=silent, + dm_action=dm_verb, + actions=actions, + duration=duration, + ) + + +# --------------------------------------------------------------------------- +# No-op setup so the cog loader skips this util module +# --------------------------------------------------------------------------- + + +async def setup(bot): # type: ignore[unused-argument] + """Utility module – nothing to load.""" + return \ No newline at end of file diff --git a/tux/cogs/moderation/ban.py b/tux/cogs/moderation/ban.py deleted file mode 100644 index 5f0b2c2af..000000000 --- a/tux/cogs/moderation/ban.py +++ /dev/null @@ -1,69 +0,0 @@ -import discord -from discord.ext import commands - -from prisma.enums import CaseType -from tux.bot import Tux -from tux.utils import checks -from tux.utils.flags import BanFlags -from tux.utils.functions import generate_usage - -from . import ModerationCogBase - - -class Ban(ModerationCogBase): - def __init__(self, bot: Tux) -> None: - super().__init__(bot) - self.ban.usage = generate_usage(self.ban, BanFlags) - - @commands.hybrid_command(name="ban", aliases=["b"]) - @commands.guild_only() - @checks.has_pl(3) - async def ban( - self, - ctx: commands.Context[Tux], - member: discord.Member, - *, - flags: BanFlags, - ) -> None: - """ - Ban a member from the server. - - Parameters - ---------- - ctx : commands.Context[Tux] - The context in which the command is being invoked. - member : discord.Member - The member to ban. - flags : BanFlags - The flags for the command. (reason: str, purge: int (< 7), silent: bool) - - Raises - ------ - discord.Forbidden - If the bot is unable to ban the user. - discord.HTTPException - If an error occurs while banning the user. - """ - - assert ctx.guild - - # Check if moderator has permission to ban the member - if not await self.check_conditions(ctx, member, ctx.author, "ban"): - return - - # Execute ban with case creation and DM - await self.execute_mod_action( - ctx=ctx, - case_type=CaseType.BAN, - user=member, - reason=flags.reason, - silent=flags.silent, - dm_action="banned", - actions=[ - (ctx.guild.ban(member, reason=flags.reason, delete_message_seconds=flags.purge * 86400), type(None)), - ], - ) - - -async def setup(bot: Tux) -> None: - await bot.add_cog(Ban(bot)) diff --git a/tux/cogs/moderation/ban_static.py b/tux/cogs/moderation/ban_static.py new file mode 100644 index 000000000..c5f4b5208 --- /dev/null +++ b/tux/cogs/moderation/ban_static.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +import discord +from discord.ext import commands +from prisma.enums import CaseType + +from tux.cogs.moderation.action_mixin import ModerationActionMixin +from tux.cogs.moderation.common import ModCmdInfo, mod_command +from tux.utils.transformers import MemberOrUser +from discord.ext import commands as ext_cmds + + +# ---------------- FlagConverter for prefix command ------------------------- + + +class BanFlags(ext_cmds.FlagConverter, delimiter=" ", prefix="-"): # noqa: D401 + """Flags for `>ban` command.""" + + purge: int = ext_cmds.flag( + default=0, + aliases=["p"], + description="Days of messages to delete (0-7)", + ) + + silent: bool = ext_cmds.flag( + default=False, + aliases=["s", "quiet"], + description="Don't DM the target", + ) + +BAN_INFO = ModCmdInfo( + name="ban", + aliases=["b"], + description="Ban a member from the server.", + case_type=CaseType.BAN, + required_pl=3, + dm_verb="banned", + supports_purge=True, +) + + +class BanCog(ModerationActionMixin, commands.Cog): + """Ban command implemented with explicit definition.""" + + def __init__(self, bot: commands.Bot) -> None: # noqa: D401 + super().__init__(bot) # type: ignore[misc] + # Register commands returned by decorator + prefix_cmd, slash_cmd = self._create_commands() + + bot.add_command(prefix_cmd) + bot.tree.add_command(slash_cmd) + + # ------------------------------------------------------------------ + # Command definitions + # ------------------------------------------------------------------ + + def _create_commands(self): # noqa: D401 + @mod_command(BAN_INFO) + async def _ban( + self: BanCog, + ctx: commands.Context, + member: MemberOrUser, + *, + flags: BanFlags | None = None, + reason: str = "", + ) -> None: # noqa: D401 + purge_val = flags.purge if flags else 0 # type: ignore[union-attr] + silent_val = flags.silent if flags else False # type: ignore[union-attr] + + await self._run_action( + ctx, + member=member, + reason=reason, + duration=None, + purge=purge_val, + silent=silent_val, + case_type=BAN_INFO.case_type, + dm_verb=BAN_INFO.dm_verb, + discord_action=lambda: ctx.guild.ban( # type: ignore[return-value] + member, + reason=reason, + delete_message_seconds=purge_val * 86_400, + ), + ) + + # decorator returns tuple of commands + return _ban # type: ignore[return-value] + + +async def setup(bot: commands.Bot): # noqa: D401 + await bot.add_cog(BanCog(bot)) \ No newline at end of file diff --git a/tux/cogs/moderation/command_config.py b/tux/cogs/moderation/command_config.py index 127b1863a..83b380740 100644 --- a/tux/cogs/moderation/command_config.py +++ b/tux/cogs/moderation/command_config.py @@ -158,6 +158,8 @@ def get_help_text(self) -> str: required_permission_level=1, supports_duration=False, supports_purge=False, + supports_reason=True, + supports_silent=True, dm_action="warned", discord_action=lambda guild, member, reason, args: None, # No Discord action for warns ), @@ -169,6 +171,8 @@ def get_help_text(self) -> str: required_permission_level=2, supports_duration=False, supports_purge=False, + supports_reason=True, + supports_silent=True, dm_action="jailed", discord_action=lambda guild, member, reason, args: None, # Handled in base class ), @@ -180,6 +184,8 @@ def get_help_text(self) -> str: required_permission_level=2, supports_duration=False, supports_purge=False, + supports_reason=True, + supports_silent=True, dm_action="unjailed", discord_action=lambda guild, member, reason, args: None, # Handled in base class ), @@ -191,6 +197,8 @@ def get_help_text(self) -> str: required_permission_level=3, supports_duration=False, supports_purge=False, + supports_reason=True, + supports_silent=True, requires_member=False, dm_action="unbanned", discord_action=lambda guild, member, reason, args: guild.unban(member, reason=reason), @@ -203,6 +211,8 @@ def get_help_text(self) -> str: required_permission_level=2, supports_duration=False, supports_purge=False, + supports_reason=True, + supports_silent=True, dm_action="untimeout", discord_action=lambda guild, member, reason, args: member.timeout(None, reason=reason), ), @@ -214,6 +224,8 @@ def get_help_text(self) -> str: required_permission_level=2, supports_duration=False, supports_purge=False, + supports_reason=True, + supports_silent=True, dm_action="snippet banned", discord_action=lambda guild, member, reason, args: None, # Handled in base class ), @@ -225,6 +237,8 @@ def get_help_text(self) -> str: required_permission_level=2, supports_duration=False, supports_purge=False, + supports_reason=True, + supports_silent=True, dm_action="snippet unbanned", discord_action=lambda guild, member, reason, args: None, # Handled in base class ), @@ -236,6 +250,8 @@ def get_help_text(self) -> str: required_permission_level=2, supports_duration=False, supports_purge=False, + supports_reason=True, + supports_silent=True, dm_action="poll banned", discord_action=lambda guild, member, reason, args: None, # Handled in base class ), @@ -247,7 +263,14 @@ def get_help_text(self) -> str: required_permission_level=2, supports_duration=False, supports_purge=False, + supports_reason=True, + supports_silent=True, dm_action="poll unbanned", discord_action=lambda guild, member, reason, args: None, # Handled in base class ), } + + +# This file is a data-only module; provide a noop setup so the cog loader is satisfied. +async def setup(bot): # type: ignore[unused-argument] + return diff --git a/tux/cogs/moderation/command_meta.py b/tux/cogs/moderation/command_meta.py new file mode 100644 index 000000000..0aad7378d --- /dev/null +++ b/tux/cogs/moderation/command_meta.py @@ -0,0 +1,179 @@ +"""Metaclass + base class for declarative moderation commands. + +Subclass `ModerationCommand` once per moderation action. Example:: + + class Ban(ModerationCommand): + name = "ban" + aliases = ["b"] + case_type = CaseType.BAN + required_pl = 3 + flags = { + "purge": dict(type=int, aliases=["p"], default=0, desc="Days to delete"), + "silent": dict(type=bool, aliases=["s", "quiet"], default=False, desc="No DM"), + } + + async def _action(self, guild: discord.Guild, member: discord.Member, *, flags, reason): + await guild.ban(member, reason=reason, delete_message_seconds=flags.purge * 86400) +""" +from __future__ import annotations + +from typing import Any, ClassVar, Dict, Type + +import discord +from discord.ext import commands + +from tux.utils.flag_factory import build_flag_converter +from tux.utils.transformers import MemberOrUser +from . import ModerationCogBase # relative import + +_REGISTRY: list[Type["ModerationCommand"]] = [] + + +class ModerationCommandMeta(type): + """Metaclass that turns class attributes into a real command.""" + + def __new__(mcls, name: str, bases: tuple[type, ...], ns: dict[str, Any]): + cls = super().__new__(mcls, name, bases, ns) + if cls.__name__ == "ModerationCommand": + return cls + + # Extract required attributes + cmd_name: str = getattr(cls, "name") # type: ignore[arg-type] + aliases: list[str] = getattr(cls, "aliases", []) # type: ignore[arg-type] + case_type = getattr(cls, "case_type") + required_pl: int = getattr(cls, "required_pl", 0) + flags_spec: Dict[str, Dict[str, Any]] = getattr(cls, "flags", {}) # type: ignore[arg-type] + description: str = getattr(cls, "description", cmd_name.title()) + + # Build FlagConverter + FlagsCls = build_flag_converter( + cmd_name, + duration="duration" in flags_spec, + purge="purge" in flags_spec, + silent="silent" in flags_spec, + ) + + # Make FlagsCls resolvable for annotation eval + if FlagsCls is not None: + import builtins # noqa: WPS433 + # Expose under a stable alias so eval("FlagsCls") always succeeds + globals()['FlagsCls'] = FlagsCls + setattr(builtins, 'FlagsCls', FlagsCls) + + # -------------------------------------------------- + # Shared executor + # -------------------------------------------------- + async def _run(self: ModerationCogBase, ctx, target: MemberOrUser, flags, reason: str): + if not await self.check_conditions(ctx, target, ctx.author, cmd_name): + return + + silent = getattr(flags, "silent", False) + duration = getattr(flags, "duration", None) + + action_coro = cls._action(self, ctx.guild, target, flags=flags, reason=reason) # type: ignore[arg-type] + actions = [(action_coro, type(None))] + + await self.execute_mod_action( + ctx=ctx, + case_type=case_type, + user=target, + reason=reason, + silent=silent, + dm_action=getattr(cls, "dm_action", cmd_name), + actions=actions, + duration=duration, + ) + + # -------------------------------------------------- + # Text command (prefix) + # -------------------------------------------------- + async def _text(self: ModerationCogBase, ctx: commands.Context, target: MemberOrUser, *, flags: FlagsCls | None = None, reason: str = "") -> None: # type: ignore[arg-type] + if flags is None: + flags = FlagsCls() # type: ignore[assignment] + await _run(self, ctx, target, flags, reason) + + if FlagsCls is not None: + _text.__globals__[FlagsCls.__name__] = FlagsCls + _text.__globals__['FlagsCls'] = FlagsCls # also as generic alias + from typing import Dict as _Dict # noqa: WPS433 + _text.__globals__.setdefault('Dict', _Dict) + + _text.__name__ = cmd_name + _text.__doc__ = description + + text_cmd = commands.command(name=cmd_name, aliases=aliases, help=description)(_text) + # Override usage string to exclude internal ctx parameter + text_cmd.usage = f"{cmd_name} " + + # -------------------------------------------------- + # Slash command + # -------------------------------------------------- + async def _slash( + interaction: discord.Interaction, + target: MemberOrUser, + *, + duration: str | None = None, + purge: int = 0, + silent: bool = False, + reason: str = "", + ) -> None: # type: ignore[arg-type] + """App command callback (no bound self) using explicit options.""" + + from types import SimpleNamespace # noqa: WPS433 + + flags_obj = SimpleNamespace(duration=duration, purge=purge, silent=silent) + + bot = interaction.client # type: ignore[attr-defined] + cog: ModerationCogBase | None = bot.get_cog("ModerationCommandsCog") # type: ignore[attr-defined] + if cog is None: + return + + ctx = await cog.bot.get_context(interaction) # type: ignore[attr-defined] + await _run(cog, ctx, target, flags_obj, reason) + + if FlagsCls is not None: + _slash.__globals__[FlagsCls.__name__] = FlagsCls + _slash.__globals__['FlagsCls'] = FlagsCls + + slash_cmd = discord.app_commands.command(name=cmd_name, description=description)(_slash) + + # store on cls + cls.text_command = text_cmd # type: ignore[attr-defined] + cls.slash_command = slash_cmd # type: ignore[attr-defined] + + # register class + _REGISTRY.append(cls) + return cls + + +class ModerationCommand(metaclass=ModerationCommandMeta): + """Base class to inherit for each moderation action.""" + + name: ClassVar[str] + aliases: ClassVar[list[str]] = [] + case_type: ClassVar[Any] + required_pl: ClassVar[int] = 0 + flags: ClassVar[Dict[str, Dict[str, Any]]] = {} + description: ClassVar[str] = "" + + # Child classes must implement _action + async def _action(self, guild: discord.Guild, member: discord.Member | discord.User, *, flags: Any, reason: str) -> None: # noqa: D401 + raise NotImplementedError + + +# Cog that loads all ModerationCommand subclasses +class ModerationCommandsCog(ModerationCogBase): + def __init__(self, bot: commands.Bot): + super().__init__(bot) # type: ignore[arg-type] + + for cls in _REGISTRY: + self.bot.add_command(cls.text_command) # type: ignore[attr-defined] + self.bot.tree.add_command(cls.slash_command) # type: ignore[attr-defined] + + +async def setup(bot: commands.Bot): + # Ensure all command modules are imported so subclasses register + import importlib + importlib.import_module("tux.cogs.moderation.commands") + + await bot.add_cog(ModerationCommandsCog(bot)) \ No newline at end of file diff --git a/tux/cogs/moderation/commands/__init__.py b/tux/cogs/moderation/commands/__init__.py new file mode 100644 index 000000000..308edd78d --- /dev/null +++ b/tux/cogs/moderation/commands/__init__.py @@ -0,0 +1,12 @@ +"""Import all moderation command modules so their metaclasses register them.""" +from importlib import import_module +from pkgutil import iter_modules +from pathlib import Path + +_pkg_path = Path(__file__).parent +for _finder, _name, _is_pkg in iter_modules([str(_pkg_path)]): + if _name.startswith("__"): + continue + import_module(f"{__name__}.{_name}") + +del import_module, iter_modules, Path, _pkg_path \ No newline at end of file diff --git a/tux/cogs/moderation/common.py b/tux/cogs/moderation/common.py new file mode 100644 index 000000000..3a27fbc2c --- /dev/null +++ b/tux/cogs/moderation/common.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Awaitable, Callable, TypeAlias + +import discord +from discord import app_commands +from discord.ext import commands +from prisma.enums import CaseType + +from tux.utils.transformers import MemberOrUser + +__all__ = [ + "ModCmdInfo", + "mod_command", +] + + +@dataclass(slots=True, frozen=True) +class ModCmdInfo: + """Static metadata for a moderation command.""" + + name: str + aliases: list[str] + description: str + case_type: CaseType + required_pl: int + dm_verb: str + supports_duration: bool = False + supports_purge: bool = False + supports_silent: bool = True + + +T_Context: TypeAlias = commands.Context # generic enough + + +# Signature of the wrapped coroutine (implemented in concrete cog) +WrappedFunc: TypeAlias = Callable[ + [T_Context, MemberOrUser], + Awaitable[None], +] + + +def mod_command(info: ModCmdInfo): + """Decorator that builds prefix *and* slash command versions. + + The decorated coroutine must be defined inside a subclass of + :class:`~tux.cogs.moderation.action_mixin.ModerationActionMixin` and must + accept the following signature:: + + async def cmd(self, ctx: commands.Context, member: MemberOrUser, *, + duration: str | None = None, purge: int = 0, + silent: bool = False, reason: str = ""): ... + + Unused parameters (e.g. *duration*) can be omitted if the corresponding + ``supports_*`` flag is *False*. + """ + + def decorator(func: Callable[..., Awaitable[None]]): + # ---------------- prefix command ---------------- + prefix_cmd = commands.command( + name=info.name, + aliases=info.aliases, + help=info.description, + )(func) + + # Provide clean usage string (no ctx) + prefix_cmd.usage = f"{info.name} " + + # ---------------- slash command ----------------- + # Build parameters dynamically based on supports_* flags + async def slash_callback( + interaction: discord.Interaction, + member: MemberOrUser, + *, + duration: str | None = None, + purge: int = 0, + silent: bool = False, + reason: str = "", + ) -> None: # noqa: D401 + bot = interaction.client + assert isinstance(bot, commands.Bot) + ctx = await bot.get_context(interaction) + await func( + ctx, # type: ignore[arg-type] + member, + duration=duration, + purge=purge, + silent=silent, + reason=reason, + ) + + # Build the slash command object + slash_cmd = app_commands.command(name=info.name, description=info.description)(slash_callback) + + # Attach metadata + setattr(prefix_cmd, "mod_info", info) + setattr(slash_cmd, "mod_info", info) + + # Return both commands so the caller can register them + return prefix_cmd, slash_cmd + + return decorator + + +# --------------------------------------------------------------------------- +# No-op setup so the cog loader doesn’t treat this util as a failing extension +# --------------------------------------------------------------------------- + + +async def setup(bot): # type: ignore[unused-argument] + """Utility module – nothing to load.""" + return \ No newline at end of file diff --git a/tux/cogs/moderation/dynamic_moderation.py b/tux/cogs/moderation/dynamic_moderation.py deleted file mode 100644 index 80d8bc9a8..000000000 --- a/tux/cogs/moderation/dynamic_moderation.py +++ /dev/null @@ -1,94 +0,0 @@ -""" -Dynamic moderation command system demonstration. - -This module demonstrates how moderation commands can be implemented -using a unified approach with the mixed_args system. -""" - -import discord -from discord.ext import commands - -from prisma.enums import CaseType -from tux.bot import Tux -from tux.utils import checks -from tux.utils.mixed_args import generate_mixed_usage, parse_mixed_arguments - -from . import ModerationCogBase - - -class DynamicModerationCog(ModerationCogBase): - """ - Dynamic moderation cog that demonstrates the unified approach. - - This cog shows how moderation commands can be implemented - using the mixed_args system for consistent argument parsing. - """ - - def __init__(self, bot: Tux) -> None: - super().__init__(bot) - # Set usage string for dtimeout command - self.dtimeout.usage = generate_mixed_usage("dtimeout", ["member"], ["duration", "reason"], ["-d", "-s"]) - - @commands.hybrid_command( - name="dtimeout", - aliases=["dt", "dto", "dmute", "dm"], - description="Dynamic timeout command using mixed_args", - ) - @commands.guild_only() - @checks.has_pl(2) - async def dtimeout( - self, - ctx: commands.Context[Tux], - member: discord.Member, - *, - mixed_args: str = "", - ) -> None: - """ - Timeout a member using dynamic mixed arguments. - - Supports both positional and flag-based arguments: - - Positional: `dtimeout @user 14d reason` - - Flag-based: `dtimeout @user reason -d 14d` - - Mixed: `dtimeout @user 14d reason -s` - """ - assert ctx.guild - - # Check if member is already timed out - if member.is_timed_out(): - await ctx.send(f"{member} is already timed out.", ephemeral=True) - return - - # Check if moderator has permission to timeout the member - if not await self.check_conditions(ctx, member, ctx.author, "timeout"): - return - - # Parse mixed arguments - parsed_args = parse_mixed_arguments(mixed_args) - - # Extract values with defaults - duration = parsed_args.get("duration") - reason = parsed_args.get("reason", "No reason provided") - silent = parsed_args.get("silent", False) - - # Validate that we have a duration - if not duration: - await ctx.send("Please provide a duration for the timeout.", ephemeral=True) - return - - # Execute the timeout action - await self.execute_mod_action( - ctx=ctx, - case_type=CaseType.TIMEOUT, - user=member, - reason=reason, - silent=silent, - dm_action="timed out", - actions=[ - (member.timeout(duration, reason=reason), type(None)), - ], - ) - - -async def setup(bot: Tux) -> None: - """Set up the dynamic moderation cog.""" - await bot.add_cog(DynamicModerationCog(bot)) diff --git a/tux/cogs/moderation/jail.py b/tux/cogs/moderation/jail.py deleted file mode 100644 index 89ddf0664..000000000 --- a/tux/cogs/moderation/jail.py +++ /dev/null @@ -1,213 +0,0 @@ -import discord -from discord.ext import commands -from loguru import logger - -from prisma.enums import CaseType -from tux.bot import Tux -from tux.utils import checks -from tux.utils.flags import JailFlags -from tux.utils.functions import generate_usage - -from . import ModerationCogBase - - -class Jail(ModerationCogBase): - def __init__(self, bot: Tux) -> None: - super().__init__(bot) - self.jail.usage = generate_usage(self.jail, JailFlags) - - async def get_jail_role(self, guild: discord.Guild) -> discord.Role | None: - """ - Get the jail role for the guild. - - Parameters - ---------- - guild : discord.Guild - The guild to get the jail role for. - - Returns - ------- - discord.Role | None - The jail role, or None if not found. - """ - jail_role_id = await self.db.guild_config.get_jail_role_id(guild.id) - return None if jail_role_id is None else guild.get_role(jail_role_id) - - async def get_jail_channel(self, guild: discord.Guild) -> discord.TextChannel | None: - """ - Get the jail channel for the guild. - """ - jail_channel_id = await self.db.guild_config.get_jail_channel_id(guild.id) - channel = guild.get_channel(jail_channel_id) if jail_channel_id is not None else None - return channel if isinstance(channel, discord.TextChannel) else None - - async def is_jailed(self, guild_id: int, user_id: int) -> bool: - """ - Check if a user is jailed. - - Parameters - ---------- - guild_id : int - The ID of the guild to check in. - user_id : int - The ID of the user to check. - - Returns - ------- - bool - True if the user is jailed, False otherwise. - """ - # Get latest case for this user (more efficient than counting all cases) - latest_case = await self.db.case.get_latest_case_by_user( - guild_id=guild_id, - user_id=user_id, - case_types=[CaseType.JAIL, CaseType.UNJAIL], - ) - - # If no cases exist or latest case is an unjail, user is not jailed - return bool(latest_case and latest_case.case_type == CaseType.JAIL) - - @commands.hybrid_command( - name="jail", - aliases=["j"], - ) - @commands.guild_only() - @checks.has_pl(2) - async def jail( - self, - ctx: commands.Context[Tux], - member: discord.Member, - *, - flags: JailFlags, - ) -> None: - """ - Jail a member in the server. - - Parameters - ---------- - ctx : commands.Context[Tux] - The context in which the command is being invoked. - member : discord.Member - The member to jail. - flags : JailFlags - The flags for the command. (reason: str, silent: bool) - - Raises - ------ - discord.Forbidden - If the bot is unable to jail the user. - discord.HTTPException - If an error occurs while jailing the user. - """ - - assert ctx.guild - - await ctx.defer(ephemeral=True) - - # Get jail role - jail_role = await self.get_jail_role(ctx.guild) - if not jail_role: - await ctx.send("No jail role found.", ephemeral=True) - return - - # Get jail channel - jail_channel = await self.get_jail_channel(ctx.guild) - if not jail_channel: - await ctx.send("No jail channel found.", ephemeral=True) - return - - # Check if user is already jailed - if await self.is_jailed(ctx.guild.id, member.id): - await ctx.send("User is already jailed.", ephemeral=True) - return - - # Check if moderator has permission to jail the member - if not await self.check_conditions(ctx, member, ctx.author, "jail"): - return - - # Use a transaction-like pattern to ensure consistency - try: - # Get roles that can be managed by the bot - user_roles = self._get_manageable_roles(member, jail_role) - - # Convert roles to IDs - case_user_roles = [role.id for role in user_roles] - - # First create the case - if this fails, no role changes are made - case = await self.db.case.insert_case( - guild_id=ctx.guild.id, - case_user_id=member.id, - case_moderator_id=ctx.author.id, - case_type=CaseType.JAIL, - case_reason=flags.reason, - case_user_roles=case_user_roles, - ) - - # Add jail role immediately - this is the most important part - await member.add_roles(jail_role, reason=flags.reason) - - # Send DM to member - dm_sent = await self.send_dm(ctx, flags.silent, member, flags.reason, "jailed") - - # Handle case response - send embed immediately - await self.handle_case_response(ctx, CaseType.JAIL, case.case_number, flags.reason, member, dm_sent) - - # Remove old roles in the background after sending the response - if user_roles: - try: - # Try to remove all at once for efficiency - await member.remove_roles(*user_roles, reason=flags.reason) - except Exception as e: - logger.warning( - f"Failed to remove all roles at once from {member}, falling back to individual removal: {e}", - ) - # Fall back to removing one by one - for role in user_roles: - try: - await member.remove_roles(role, reason=flags.reason) - except Exception as role_e: - logger.error(f"Failed to remove role {role} from {member}: {role_e}") - # Continue with other roles even if one fails - - except Exception as e: - logger.error(f"Failed to jail {member}: {e}") - await ctx.send(f"Failed to jail {member}: {e}", ephemeral=True) - return - - @staticmethod - def _get_manageable_roles( - member: discord.Member, - jail_role: discord.Role, - ) -> list[discord.Role]: - """ - Get the roles that can be managed by the bot. - - Parameters - ---------- - member : discord.Member - The member to jail. - jail_role : discord.Role - The jail role. - - Returns - ------- - list[discord.Role] - A list of roles that can be managed by the bot. - """ - - return [ - role - for role in member.roles - if not ( - role.is_bot_managed() - or role.is_premium_subscriber() - or role.is_integration() - or role.is_default() - or role == jail_role - ) - and role.is_assignable() - ] - - -async def setup(bot: Tux) -> None: - await bot.add_cog(Jail(bot)) diff --git a/tux/cogs/moderation/kick.py b/tux/cogs/moderation/kick.py deleted file mode 100644 index 4b37bc4ff..000000000 --- a/tux/cogs/moderation/kick.py +++ /dev/null @@ -1,69 +0,0 @@ -import discord -from discord.ext import commands - -from prisma.enums import CaseType -from tux.bot import Tux -from tux.utils import checks -from tux.utils.flags import KickFlags -from tux.utils.functions import generate_usage - -from . import ModerationCogBase - - -class Kick(ModerationCogBase): - def __init__(self, bot: Tux) -> None: - super().__init__(bot) - self.kick.usage = generate_usage(self.kick, KickFlags) - - @commands.hybrid_command( - name="kick", - aliases=["k"], - ) - @commands.guild_only() - @checks.has_pl(2) - async def kick( - self, - ctx: commands.Context[Tux], - member: discord.Member, - *, - flags: KickFlags, - ) -> None: - """ - Kick a member from the server. - - Parameters - ---------- - ctx : commands.Context[Tux] - The context in which the command is being invoked. - member : discord.Member - The member to kick. - flags : KickFlags - The flags for the command. (reason: str, silent: bool) - - Raises - ------ - discord.Forbidden - If the bot is unable to kick the user. - discord.HTTPException - If an error occurs while kicking the user. - """ - assert ctx.guild - - # Check if moderator has permission to kick the member - if not await self.check_conditions(ctx, member, ctx.author, "kick"): - return - - # Execute kick with case creation and DM - await self.execute_mod_action( - ctx=ctx, - case_type=CaseType.KICK, - user=member, - reason=flags.reason, - silent=flags.silent, - dm_action="kicked", - actions=[(ctx.guild.kick(member, reason=flags.reason), type(None))], - ) - - -async def setup(bot: Tux) -> None: - await bot.add_cog(Kick(bot)) diff --git a/tux/cogs/moderation/pollban.py b/tux/cogs/moderation/pollban.py deleted file mode 100644 index bca4ad61f..000000000 --- a/tux/cogs/moderation/pollban.py +++ /dev/null @@ -1,68 +0,0 @@ -import discord -from discord.ext import commands - -from prisma.enums import CaseType -from tux.bot import Tux -from tux.utils import checks -from tux.utils.flags import PollBanFlags -from tux.utils.functions import generate_usage - -from . import ModerationCogBase - - -class PollBan(ModerationCogBase): - def __init__(self, bot: Tux) -> None: - super().__init__(bot) - self.poll_ban.usage = generate_usage(self.poll_ban, PollBanFlags) - - @commands.hybrid_command( - name="pollban", - aliases=["pb"], - ) - @commands.guild_only() - @checks.has_pl(3) - async def poll_ban( - self, - ctx: commands.Context[Tux], - member: discord.Member, - *, - flags: PollBanFlags, - ) -> None: - """ - Ban a user from creating polls. - - Parameters - ---------- - ctx : commands.Context[Tux] - The context object. - member : discord.Member - The member to poll ban. - flags : PollBanFlags - The flags for the command. (reason: str, silent: bool) - """ - assert ctx.guild - - # Check if user is already poll banned - if await self.is_pollbanned(ctx.guild.id, member.id): - await ctx.send("User is already poll banned.", ephemeral=True) - return - - # Check if moderator has permission to poll ban the member - if not await self.check_conditions(ctx, member, ctx.author, "poll ban"): - return - - # Execute poll ban with case creation and DM - await self.execute_mod_action( - ctx=ctx, - case_type=CaseType.POLLBAN, - user=member, - reason=flags.reason, - silent=flags.silent, - dm_action="poll banned", - # Use dummy coroutine for actions that don't need Discord API calls - actions=[(self._dummy_action(), type(None))], - ) - - -async def setup(bot: Tux) -> None: - await bot.add_cog(PollBan(bot)) diff --git a/tux/cogs/moderation/pollunban.py b/tux/cogs/moderation/pollunban.py deleted file mode 100644 index 7de595528..000000000 --- a/tux/cogs/moderation/pollunban.py +++ /dev/null @@ -1,68 +0,0 @@ -import discord -from discord.ext import commands - -from prisma.enums import CaseType -from tux.bot import Tux -from tux.utils import checks -from tux.utils.flags import PollUnbanFlags -from tux.utils.functions import generate_usage - -from . import ModerationCogBase - - -class PollUnban(ModerationCogBase): - def __init__(self, bot: Tux) -> None: - super().__init__(bot) - self.poll_unban.usage = generate_usage(self.poll_unban, PollUnbanFlags) - - @commands.hybrid_command( - name="pollunban", - aliases=["pub"], - ) - @commands.guild_only() - @checks.has_pl(3) - async def poll_unban( - self, - ctx: commands.Context[Tux], - member: discord.Member, - *, - flags: PollUnbanFlags, - ) -> None: - """ - Remove a poll ban from a member. - - Parameters - ---------- - ctx : commands.Context[Tux] - The context object. - member : discord.Member - The member to remove poll ban from. - flags : PollUnbanFlags - The flags for the command. (reason: str, silent: bool) - """ - assert ctx.guild - - # Check if user is poll banned - if not await self.is_pollbanned(ctx.guild.id, member.id): - await ctx.send("User is not poll banned.", ephemeral=True) - return - - # Check if moderator has permission to poll unban the member - if not await self.check_conditions(ctx, member, ctx.author, "poll unban"): - return - - # Execute poll unban with case creation and DM - await self.execute_mod_action( - ctx=ctx, - case_type=CaseType.POLLUNBAN, - user=member, - reason=flags.reason, - silent=flags.silent, - dm_action="poll unbanned", - # Use dummy coroutine for actions that don't need Discord API calls - actions=[(self._dummy_action(), type(None))], - ) - - -async def setup(bot: Tux) -> None: - await bot.add_cog(PollUnban(bot)) diff --git a/tux/cogs/moderation/snippetban.py b/tux/cogs/moderation/snippetban.py deleted file mode 100644 index 2b90fc696..000000000 --- a/tux/cogs/moderation/snippetban.py +++ /dev/null @@ -1,68 +0,0 @@ -import discord -from discord.ext import commands - -from prisma.enums import CaseType -from tux.bot import Tux -from tux.utils import checks -from tux.utils.flags import SnippetBanFlags -from tux.utils.functions import generate_usage - -from . import ModerationCogBase - - -class SnippetBan(ModerationCogBase): - def __init__(self, bot: Tux) -> None: - super().__init__(bot) - self.snippet_ban.usage = generate_usage(self.snippet_ban, SnippetBanFlags) - - @commands.hybrid_command( - name="snippetban", - aliases=["sb"], - ) - @commands.guild_only() - @checks.has_pl(3) - async def snippet_ban( - self, - ctx: commands.Context[Tux], - member: discord.Member, - *, - flags: SnippetBanFlags, - ) -> None: - """ - Ban a member from creating snippets. - - Parameters - ---------- - ctx : commands.Context[Tux] - The context object. - member : discord.Member - The member to snippet ban. - flags : SnippetBanFlags - The flags for the command. (reason: str, silent: bool) - """ - assert ctx.guild - - # Check if user is already snippet banned - if await self.is_snippetbanned(ctx.guild.id, member.id): - await ctx.send("User is already snippet banned.", ephemeral=True) - return - - # Check if moderator has permission to snippet ban the member - if not await self.check_conditions(ctx, member, ctx.author, "snippet ban"): - return - - # Execute snippet ban with case creation and DM - await self.execute_mod_action( - ctx=ctx, - case_type=CaseType.SNIPPETBAN, - user=member, - reason=flags.reason, - silent=flags.silent, - dm_action="snippet banned", - # Use dummy coroutine for actions that don't need Discord API calls - actions=[(self._dummy_action(), type(None))], - ) - - -async def setup(bot: Tux) -> None: - await bot.add_cog(SnippetBan(bot)) diff --git a/tux/cogs/moderation/snippetunban.py b/tux/cogs/moderation/snippetunban.py deleted file mode 100644 index 59179bb76..000000000 --- a/tux/cogs/moderation/snippetunban.py +++ /dev/null @@ -1,68 +0,0 @@ -import discord -from discord.ext import commands - -from prisma.enums import CaseType -from tux.bot import Tux -from tux.utils import checks -from tux.utils.flags import SnippetUnbanFlags -from tux.utils.functions import generate_usage - -from . import ModerationCogBase - - -class SnippetUnban(ModerationCogBase): - def __init__(self, bot: Tux) -> None: - super().__init__(bot) - self.snippet_unban.usage = generate_usage(self.snippet_unban, SnippetUnbanFlags) - - @commands.hybrid_command( - name="snippetunban", - aliases=["sub"], - ) - @commands.guild_only() - @checks.has_pl(3) - async def snippet_unban( - self, - ctx: commands.Context[Tux], - member: discord.Member, - *, - flags: SnippetUnbanFlags, - ) -> None: - """ - Remove a snippet ban from a member. - - Parameters - ---------- - ctx : commands.Context[Tux] - The context object. - member : discord.Member - The member to remove snippet ban from. - flags : SnippetUnbanFlags - The flags for the command. (reason: str, silent: bool) - """ - assert ctx.guild - - # Check if user is snippet banned - if not await self.is_snippetbanned(ctx.guild.id, member.id): - await ctx.send("User is not snippet banned.", ephemeral=True) - return - - # Check if moderator has permission to snippet unban the member - if not await self.check_conditions(ctx, member, ctx.author, "snippet unban"): - return - - # Execute snippet unban with case creation and DM - await self.execute_mod_action( - ctx=ctx, - case_type=CaseType.SNIPPETUNBAN, - user=member, - reason=flags.reason, - silent=flags.silent, - dm_action="snippet unbanned", - # Use dummy coroutine for actions that don't need Discord API calls - actions=[(self._dummy_action(), type(None))], - ) - - -async def setup(bot: Tux) -> None: - await bot.add_cog(SnippetUnban(bot)) diff --git a/tux/cogs/moderation/tempban.py b/tux/cogs/moderation/tempban.py deleted file mode 100644 index c6543b817..000000000 --- a/tux/cogs/moderation/tempban.py +++ /dev/null @@ -1,264 +0,0 @@ -from datetime import UTC, datetime, timedelta - -import discord -from discord.ext import commands, tasks -from loguru import logger - -from prisma.enums import CaseType -from prisma.models import Case -from tux.bot import Tux -from tux.utils import checks -from tux.utils.flags import TempBanFlags -from tux.utils.functions import parse_time_string -from tux.utils.mixed_args import generate_mixed_usage, is_duration - -from . import ModerationCogBase - - -class TempBan(ModerationCogBase): - def __init__(self, bot: Tux) -> None: - super().__init__(bot) - # Generate flexible usage that shows both formats - self.tempban.usage = generate_mixed_usage( - "tempban", - ["member"], - ["duration", "reason"], - ["-d duration", "-p purge", "-s"], - ) - self._processing_tempbans = False # Lock to prevent overlapping task runs - self.tempban_check.start() - - @commands.hybrid_command(name="tempban", aliases=["tb"]) - @commands.guild_only() - @checks.has_pl(3) - async def tempban( - self, - ctx: commands.Context[Tux], - member: discord.Member, - duration_or_reason: str | None = None, - *, - flags: TempBanFlags | None = None, - ) -> None: - """ - Temporarily ban a member from the server. - - Supports both positional and flag-based arguments: - - Positional: `tempban @user 14d reason` - - Flag-based: `tempban @user reason -d 14d` - - Mixed: `tempban @user 14d reason -s` - - Parameters - ---------- - ctx : commands.Context[Tux] - The context in which the command is being invoked. - member : discord.Member - The member to ban. - duration_or_reason : Optional[str] - Either a duration (e.g., "14d") or reason if using positional format. - flags : Optional[TempBanFlags] - The flags for the command. (duration: float (via converter), purge: int (< 7), silent: bool) - - Raises - ------ - discord.Forbidden - If the bot is unable to ban the user. - discord.HTTPException - If an error occurs while banning the user. - """ - - assert ctx.guild - - # Check if moderator has permission to temp ban the member - if not await self.check_conditions(ctx, member, ctx.author, "temp ban"): - return - - # Parse arguments - support both positional and flag formats - duration = None - reason = None - silent = False - purge = 0 - - # Check if duration_or_reason is a duration (time pattern) - if duration_or_reason and is_duration(duration_or_reason): - duration = duration_or_reason - # If flags are provided, use them for reason, silent, and purge - if flags: - reason = flags.reason - silent = flags.silent - purge = flags.purge - else: - # No flags provided, assume remaining arguments are reason - reason = "No reason provided" - else: - # duration_or_reason is not a duration, treat as reason - if duration_or_reason: - reason = duration_or_reason - elif flags: - reason = flags.reason - else: - reason = "No reason provided" - - # Use flags for duration, silent, and purge if provided - if flags: - duration = str(flags.duration) if flags.duration else None - silent = flags.silent - purge = flags.purge - - # Validate that we have a duration - if not duration: - await ctx.send("Duration is required. Use format like '14d', '1h', etc.", ephemeral=True) - return - - # Parse and validate duration - try: - parsed_duration = parse_time_string(duration) - duration_seconds = parsed_duration.total_seconds() - except ValueError as e: - await ctx.send(f"Invalid duration format: {e}", ephemeral=True) - return - - # Calculate expiration datetime from duration - expires_at = datetime.now(UTC) + timedelta(seconds=duration_seconds) - - # Create a simple duration string for logging/display - duration_display_str = str(timedelta(seconds=int(duration_seconds))) - - # Execute tempban with case creation and DM - await self.execute_mod_action( - ctx=ctx, - case_type=CaseType.TEMPBAN, - user=member, - reason=reason, - silent=silent, - dm_action="temp banned", - actions=[ - (ctx.guild.ban(member, reason=reason, delete_message_seconds=purge * 86400), type(None)), - ], - duration=duration_display_str, # Pass readable string for logging - expires_at=expires_at, # Pass calculated expiration datetime - ) - - async def _process_tempban_case(self, case: Case) -> tuple[int, int]: - """Process an individual tempban case. Returns (processed_cases, failed_cases).""" - - # Check for essential data first - if not (case.guild_id and case.case_user_id and case.case_id): - logger.error(f"Invalid case data: {case}") - return 0, 0 - - guild = self.bot.get_guild(case.guild_id) - if not guild: - logger.warning(f"Guild {case.guild_id} not found for case {case.case_id}") - return 0, 0 - - # Check ban status - try: - await guild.fetch_ban(discord.Object(id=case.case_user_id)) - # If fetch_ban succeeds without error, the user IS banned. - except discord.NotFound: - # User is not banned. Mark expired and consider processed. - await self.db.case.set_tempban_expired(case.case_id, case.guild_id) - return 1, 0 - except Exception as e: - # Log error during ban check, but proceed to attempt unban anyway - # This matches the original logic's behavior. - logger.warning(f"Error checking ban status for {case.case_user_id} in {guild.id}: {e}") - - # Attempt to unban (runs if user was found banned or if ban check failed) - processed_count, failed_count = 0, 0 - try: - # Perform the unban - await guild.unban( - discord.Object(id=case.case_user_id), - reason="Temporary ban expired.", - ) - except (discord.Forbidden, discord.HTTPException) as e: - # Discord API unban failed - logger.error(f"Failed to unban {case.case_user_id} in {guild.id}: {e}") - failed_count = 1 - except Exception as e: - # Catch other potential errors during unban - logger.error( - f"Unexpected error during unban attempt for tempban {case.case_id} (user {case.case_user_id}, guild {guild.id}): {e}", - ) - failed_count = 1 - else: - # Unban successful, now update the database - try: - update_result = await self.db.case.set_tempban_expired(case.case_id, case.guild_id) - - if update_result == 1: - logger.info( - f"Successfully unbanned user {case.case_user_id} and marked case {case.case_id} as expired in guild {guild.id}.", - ) - processed_count = 1 - elif update_result is None: - logger.info( - f"Successfully unbanned user {case.case_user_id} in guild {guild.id} (case {case.case_id} was already marked expired).", - ) - processed_count = 1 # Still count as success - else: - logger.error( - f"Unexpected update result ({update_result}) when marking case {case.case_id} as expired for user {case.case_user_id} in guild {guild.id}.", - ) - failed_count = 1 - except Exception as e: - # Catch errors during DB update - logger.error( - f"Unexpected error during DB update for tempban {case.case_id} (user {case.case_user_id}, guild {guild.id}): {e}", - ) - failed_count = 1 - - return processed_count, failed_count - - @tasks.loop(minutes=1) - async def tempban_check(self) -> None: - """ - Check for expired tempbans at a set interval and unban the user if the ban has expired. - - Uses a simple locking mechanism to prevent overlapping executions. - Processes bans in smaller batches to prevent timeout issues. - - Raises - ------ - Exception - If an error occurs while checking for expired tempbans. - """ - # Skip if already processing - if self._processing_tempbans: - return - - try: - self._processing_tempbans = True - - # Get expired tempbans - expired_cases = await self.db.case.get_expired_tempbans() - processed_cases = 0 - failed_cases = 0 - - for case in expired_cases: - # Process each case using the helper method - processed, failed = await self._process_tempban_case(case) - processed_cases += processed - failed_cases += failed - - if processed_cases > 0 or failed_cases > 0: - logger.info(f"Tempban check: processed {processed_cases} cases, {failed_cases} failures") - - except Exception as e: - logger.error(f"Failed to check tempbans: {e}") - finally: - self._processing_tempbans = False - - @tempban_check.before_loop - async def before_tempban_check(self) -> None: - """Wait for the bot to be ready before starting the loop.""" - await self.bot.wait_until_ready() - - async def cog_unload(self) -> None: - """Cancel the tempban check loop when the cog is unloaded.""" - self.tempban_check.cancel() - - -async def setup(bot: Tux) -> None: - await bot.add_cog(TempBan(bot)) diff --git a/tux/cogs/moderation/timeout.py b/tux/cogs/moderation/timeout.py deleted file mode 100644 index 6f3821251..000000000 --- a/tux/cogs/moderation/timeout.py +++ /dev/null @@ -1,137 +0,0 @@ -import datetime - -import discord -from discord.ext import commands - -from prisma.enums import CaseType -from tux.bot import Tux -from tux.utils import checks -from tux.utils.flags import TimeoutFlags -from tux.utils.functions import parse_time_string -from tux.utils.mixed_args import generate_mixed_usage, is_duration - -from . import ModerationCogBase - - -class Timeout(ModerationCogBase): - def __init__(self, bot: Tux) -> None: - super().__init__(bot) - # Generate flexible usage that shows both formats - self.timeout.usage = generate_mixed_usage("timeout", ["member"], ["duration", "reason"], ["-d duration", "-s"]) - - @commands.hybrid_command( - name="timeout", - aliases=["t", "to", "mute", "m"], - ) - @commands.guild_only() - @checks.has_pl(2) - async def timeout( - self, - ctx: commands.Context[Tux], - member: discord.Member, - duration_or_reason: str | None = None, - *, - flags: TimeoutFlags | None = None, - ) -> None: - """ - Timeout a member from the server. - - Supports both positional and flag-based arguments: - - Positional: `timeout @user 14d reason` - - Flag-based: `timeout @user reason -d 14d` - - Mixed: `timeout @user 14d reason -s` - - Parameters - ---------- - ctx : commands.Context[Tux] - The context in which the command is being invoked. - member : discord.Member - The member to timeout. - duration_or_reason : Optional[str] - Either a duration (e.g., "14d") or reason if using positional format. - flags : Optional[TimeoutFlags] - The flags for the command (duration: str, silent: bool). - - Raises - ------ - discord.DiscordException - If an error occurs while timing out the user. - """ - assert ctx.guild - - # Check if member is already timed out - if member.is_timed_out(): - await ctx.send(f"{member} is already timed out.", ephemeral=True) - return - - # Check if moderator has permission to timeout the member - if not await self.check_conditions(ctx, member, ctx.author, "timeout"): - return - - # Parse arguments - support both positional and flag formats - duration = None - reason = None - silent = False - - # Check if duration_or_reason is a duration (time pattern) - if duration_or_reason and is_duration(duration_or_reason): - duration = duration_or_reason - # If flags are provided, use them for reason and silent - if flags: - reason = flags.reason - silent = flags.silent - else: - # No flags provided, assume remaining arguments are reason - reason = "No reason provided" - else: - # duration_or_reason is not a duration, treat as reason - if duration_or_reason: - reason = duration_or_reason - elif flags: - reason = flags.reason - else: - reason = "No reason provided" - - # Use flags for duration and silent if provided - if flags: - duration = flags.duration - silent = flags.silent - - # Validate that we have a duration - if not duration: - await ctx.send("Duration is required. Use format like '14d', '1h', etc.", ephemeral=True) - return - - # Parse and validate duration - try: - parsed_duration = parse_time_string(duration) - - # Discord maximum timeout duration is 28 days - max_duration = datetime.timedelta(days=28) - if parsed_duration > max_duration: - await ctx.send( - "Timeout duration exceeds Discord's maximum of 28 days. Setting timeout to maximum allowed (28 days).", - ephemeral=True, - ) - parsed_duration = max_duration - # Update the display duration for consistency - duration = "28d" - except ValueError as e: - await ctx.send(f"Invalid duration format: {e}", ephemeral=True) - return - - # Execute timeout with case creation and DM - await self.execute_mod_action( - ctx=ctx, - case_type=CaseType.TIMEOUT, - user=member, - reason=reason, - silent=silent, - dm_action=f"timed out for {duration}", - actions=[(member.timeout(parsed_duration, reason=reason), type(None))], - duration=duration, - ) - - -async def setup(bot: Tux) -> None: - await bot.add_cog(Timeout(bot)) diff --git a/tux/cogs/moderation/unban.py b/tux/cogs/moderation/unban.py deleted file mode 100644 index c2fc5a6f4..000000000 --- a/tux/cogs/moderation/unban.py +++ /dev/null @@ -1,164 +0,0 @@ -from contextlib import suppress - -import discord -from discord.ext import commands - -from prisma.enums import CaseType -from tux.bot import Tux -from tux.utils import checks -from tux.utils.constants import CONST -from tux.utils.flags import UnbanFlags -from tux.utils.functions import generate_usage - -from . import ModerationCogBase - - -class Unban(ModerationCogBase): - def __init__(self, bot: Tux) -> None: - super().__init__(bot) - self.unban.usage = generate_usage(self.unban, UnbanFlags) - - async def resolve_user_from_ban_list(self, ctx: commands.Context[Tux], identifier: str) -> discord.User | None: - """ - Resolve a user from the ban list using username, ID, or partial info. - - Parameters - ---------- - ctx : commands.Context[Tux] - The context of the command. - identifier : str - The username, ID, or partial identifier to resolve. - - Returns - ------- - Optional[discord.User] - The user if found, None otherwise. - """ - assert ctx.guild - - # Get the list of banned users - banned_users = [ban.user async for ban in ctx.guild.bans()] - - # Try ID first - with suppress(ValueError): - user_id = int(identifier) - for user in banned_users: - if user.id == user_id: - return user - - # Try exact username or username#discriminator matching - for user in banned_users: - if user.name.lower() == identifier.lower(): - return user - if str(user).lower() == identifier.lower(): - return user - - # Try partial name matching - identifier_lower = identifier.lower() - matches = [user for user in banned_users if identifier_lower in user.name.lower()] - - return matches[0] if len(matches) == 1 else None - - # New private method extracted from the nested function - async def _perform_unban( - self, - ctx: commands.Context[Tux], - user: discord.User, - final_reason: str, - guild: discord.Guild, # Pass guild explicitly - ) -> None: - """Executes the core unban action and case creation.""" - # We already checked that user is not None in the main command - assert user is not None, "User cannot be None at this point" - await self.execute_mod_action( - ctx=ctx, - case_type=CaseType.UNBAN, - user=user, - reason=final_reason, - silent=True, # No DM for unbans due to user not being in the guild - dm_action="", # No DM for unbans - actions=[(guild.unban(user, reason=final_reason), type(None))], # Use passed guild - ) - - @commands.hybrid_command( - name="unban", - aliases=["ub"], - ) - @commands.guild_only() - @checks.has_pl(3) - async def unban( - self, - ctx: commands.Context[Tux], - username_or_id: str, - reason: str | None = None, - *, - flags: UnbanFlags, - ) -> None: - """ - Unban a user from the server. - - Parameters - ---------- - ctx : commands.Context[Tux] - The context object for the command. - username_or_id : str - The username or ID of the user to unban. - reason : Optional[str] - The reason for the unban. - flags : UnbanFlags - The flags for the command. - - Raises - ------ - discord.Forbidden - If the bot does not have the necessary permissions. - discord.HTTPException - If an error occurs while unbanning the user. - """ - assert ctx.guild - - await ctx.defer(ephemeral=True) - - # First, try standard user conversion - try: - user = await commands.UserConverter().convert(ctx, username_or_id) - except commands.UserNotFound: - # If that fails, try more flexible ban list matching - user = await self.resolve_user_from_ban_list(ctx, username_or_id) - if not user: - await self.send_error_response( - ctx, - f"Could not find '{username_or_id}' in the ban list. Try using the exact username or ID.", - ) - return - - # Check if the user is banned - try: - await ctx.guild.fetch_ban(user) - except discord.NotFound: - await self.send_error_response(ctx, f"{user} is not banned.") - return - - # Check if moderator has permission to unban the user - if not await self.check_conditions(ctx, user, ctx.author, "unban"): - return - - final_reason = reason or CONST.DEFAULT_REASON - guild = ctx.guild - - try: - # Call the lock executor with a lambda referencing the new private method - await self.execute_user_action_with_lock( - user.id, - lambda: self._perform_unban(ctx, user, final_reason, guild), - ) - except discord.NotFound: - # This might occur if the user was unbanned between the fetch_ban check and the lock acquisition - await self.send_error_response(ctx, f"{user} is no longer banned.") - except discord.HTTPException as e: - # Catch potential errors during the unban action forwarded by execute_mod_action - await self.send_error_response(ctx, f"Failed to unban {user}", e) - - -async def setup(bot: Tux) -> None: - await bot.add_cog(Unban(bot)) diff --git a/tux/cogs/moderation/unjail.py b/tux/cogs/moderation/unjail.py deleted file mode 100644 index 761b0bbee..000000000 --- a/tux/cogs/moderation/unjail.py +++ /dev/null @@ -1,284 +0,0 @@ -import asyncio - -import discord -from discord.ext import commands -from loguru import logger - -from prisma.enums import CaseType -from prisma.models import Case -from tux.bot import Tux -from tux.utils import checks -from tux.utils.flags import UnjailFlags -from tux.utils.functions import generate_usage - -from . import ModerationCogBase - - -class Unjail(ModerationCogBase): - def __init__(self, bot: Tux) -> None: - super().__init__(bot) - self.unjail.usage = generate_usage(self.unjail, UnjailFlags) - - async def get_jail_role(self, guild: discord.Guild) -> discord.Role | None: - """ - Get the jail role for the guild. - - Parameters - ---------- - guild : discord.Guild - The guild to get the jail role for. - - Returns - ------- - Optional[discord.Role] - The jail role, or None if not found. - """ - - jail_role_id = await self.db.guild_config.get_jail_role_id(guild.id) - return None if jail_role_id is None else guild.get_role(jail_role_id) - - async def get_latest_jail_case(self, guild_id: int, user_id: int) -> Case | None: - """ - Get the latest jail case for a user. - - Parameters - ---------- - guild_id : int - The ID of the guild to check in. - user_id : int - The ID of the user to check. - - Returns - ------- - Optional[Case] - The latest jail case, or None if not found. - """ - - return await self.db.case.get_latest_case_by_user( - guild_id=guild_id, - user_id=user_id, - case_types=[CaseType.JAIL], - ) - - async def restore_roles( - self, - member: discord.Member, - role_ids: list[int], - reason: str, - ) -> tuple[bool, list[discord.Role]]: - """ - Restore roles to a member with error handling. - - Parameters - ---------- - member : discord.Member - The member to restore roles to. - role_ids : List[int] - The IDs of the roles to restore. - reason : str - The reason for restoring the roles. - - Returns - ------- - Tuple[bool, List[discord.Role]] - A tuple containing whether the operation was successful and which roles were restored. - """ - - if not role_ids: - return True, [] - - # Filter out roles that no longer exist or can't be assigned - guild = member.guild - roles_to_add: list[discord.Role] = [] - skipped_roles: list[int] = [] - - for role_id in role_ids: - role = guild.get_role(role_id) - if role and role.is_assignable(): - roles_to_add.append(role) - else: - skipped_roles.append(role_id) - - if skipped_roles: - logger.warning( - f"Skipping {len(skipped_roles)} roles that don't exist or can't be assigned: {skipped_roles}", - ) - - if not roles_to_add: - return True, [] - - # Try to add all roles at once - try: - await member.add_roles(*roles_to_add, reason=reason) - - except discord.Forbidden: - logger.error(f"No permission to add roles to {member}") - return False, [] - - except discord.HTTPException as e: - # If bulk add fails, try one by one - logger.warning(f"Bulk role add failed for {member}, trying one by one: {e}") - successful_roles: list[discord.Role] = [] - - for role in roles_to_add: - try: - await member.add_roles(role, reason=reason) - successful_roles.append(role) - - except Exception as role_e: - logger.error(f"Failed to add role {role} to {member}: {role_e}") - - return bool(successful_roles), successful_roles - - else: - return True, roles_to_add - - @commands.hybrid_command( - name="unjail", - aliases=["uj"], - ) - @commands.guild_only() - @checks.has_pl(2) - async def unjail( - self, - ctx: commands.Context[Tux], - member: discord.Member, - *, - flags: UnjailFlags, - ) -> None: - """ - Remove a member from jail. - - Parameters - ---------- - ctx : commands.Context[Tux] - The context in which the command is being invoked. - member : discord.Member - The member to unjail. - flags : UnjailFlags - The flags for the command. (reason: str, silent: bool) - - Raises - ------ - discord.Forbidden - If the bot is unable to unjail the user. - discord.HTTPException - If an error occurs while unjailing the user. - """ - - assert ctx.guild - - await ctx.defer(ephemeral=True) - - # Get jail role - jail_role = await self.get_jail_role(ctx.guild) - if not jail_role: - await self.send_error_response(ctx, "No jail role found.") - return - - # Check if user is jailed - if not await self.is_jailed(ctx.guild.id, member.id): - await self.send_error_response(ctx, "User is not jailed.") - return - - # Check if moderator has permission to unjail the member - if not await self.check_conditions(ctx, member, ctx.author, "unjail"): - return - - # Use lock to prevent race conditions - async def perform_unjail() -> None: - nonlocal ctx, member, jail_role, flags - - # Re-assert guild is not None inside the nested function for type safety - assert ctx.guild is not None, "Guild context should exist here" - guild_id = ctx.guild.id - - # Get latest jail case *before* modifying roles - case = await self.get_latest_jail_case(guild_id, member.id) - if not case: - await self.send_error_response(ctx, "No jail case found.") - return - - # Wrap core actions in try/except as suggested - try: - # Remove jail role from member - assert jail_role is not None, "Jail role should not be None at this point" - await member.remove_roles(jail_role, reason=flags.reason) - logger.info(f"Removed jail role from {member} by {ctx.author}") - - # Insert unjail case into database - case_result = await self.db.case.insert_case( - case_user_id=member.id, - case_moderator_id=ctx.author.id, - case_type=CaseType.UNJAIL, - case_reason=flags.reason, - guild_id=guild_id, - ) - - # Send DM to member - dm_sent = await self.send_dm(ctx, flags.silent, member, flags.reason, "removed from jail") - - # Handle case response - send embed immediately - await self.handle_case_response( - ctx, - CaseType.UNJAIL, - case_result.case_number, - flags.reason, - member, - dm_sent, - ) - - # Add roles back to member after sending the response - if case.case_user_roles: - success, restored_roles = await self.restore_roles(member, case.case_user_roles, flags.reason) - if success and restored_roles: - logger.info(f"Restored {len(restored_roles)} roles to {member}") - - # Restore the role verification logic here - # Shorter wait time for roles to be applied by Discord - await asyncio.sleep(0.5) - - # Verify if all roles were successfully added back - # Check ctx.guild again for safety within this block - if ctx.guild and case.case_user_roles: - # Check for missing roles in a simpler way - member_role_ids = {role.id for role in member.roles} - missing_roles: list[str] = [] - - for role_id in case.case_user_roles: - if role_id not in member_role_ids: - role = ctx.guild.get_role(role_id) - role_name = role.name if role else str(role_id) - missing_roles.append(role_name) - - if missing_roles: - missing_str = ", ".join(missing_roles) - logger.warning(f"Failed to restore roles for {member}: {missing_str}") - # Optionally notify moderator/user if roles failed to restore - # Example: await ctx.send(f"Note: Some roles couldn't be restored: {missing_str}", ephemeral=True) - - elif not restored_roles: - logger.warning( - f"No roles to restore for {member} or restore action failed partially/completely.", - ) - - except (discord.Forbidden, discord.HTTPException) as e: - # Specific Discord API errors during role removal or subsequent actions - error_message = f"Failed to unjail {member}: Discord API error." - logger.error(f"{error_message} Details: {e}") - await self.send_error_response(ctx, error_message, e) - # No specific rollback needed, but ensure case is not created/logged incorrectly if needed - - except Exception as e: - # Catch any other unexpected error - error_message = f"An unexpected error occurred while unjailing {member}." - logger.exception(f"{error_message}", exc_info=e) # Use logger.exception for traceback - await self.send_error_response(ctx, error_message) - # No specific rollback needed - - # Execute the locked action - await self.execute_user_action_with_lock(member.id, perform_unjail) - - -async def setup(bot: Tux) -> None: - await bot.add_cog(Unjail(bot)) diff --git a/tux/cogs/moderation/untimeout.py b/tux/cogs/moderation/untimeout.py deleted file mode 100644 index 86733e7f7..000000000 --- a/tux/cogs/moderation/untimeout.py +++ /dev/null @@ -1,72 +0,0 @@ -import discord -from discord.ext import commands - -from prisma.enums import CaseType -from tux.bot import Tux -from tux.utils import checks -from tux.utils.flags import UntimeoutFlags -from tux.utils.functions import generate_usage - -from . import ModerationCogBase - - -class Untimeout(ModerationCogBase): - def __init__(self, bot: Tux) -> None: - super().__init__(bot) - self.untimeout.usage = generate_usage(self.untimeout, UntimeoutFlags) - - @commands.hybrid_command( - name="untimeout", - aliases=["ut", "uto", "unmute"], - ) - @commands.guild_only() - @checks.has_pl(2) - async def untimeout( - self, - ctx: commands.Context[Tux], - member: discord.Member, - *, - flags: UntimeoutFlags, - ) -> None: - """ - Remove timeout from a member. - - Parameters - ---------- - ctx : commands.Context[Tux] - The context in which the command is being invoked. - member : discord.Member - The member to remove timeout from. - flags : UntimeoutFlags - The flags for the command. (reason: str, silent: bool) - - Raises - ------ - discord.DiscordException - If an error occurs while removing the timeout. - """ - assert ctx.guild - - # Check if member is timed out - if not member.is_timed_out(): - await ctx.send(f"{member} is not timed out.", ephemeral=True) - return - - # Check if moderator has permission to untimeout the member - if not await self.check_conditions(ctx, member, ctx.author, "untimeout"): - return - - # Execute untimeout with case creation and DM - await self.execute_mod_action( - ctx=ctx, - case_type=CaseType.UNTIMEOUT, - user=member, - reason=flags.reason, - silent=flags.silent, - dm_action="removed from timeout", - actions=[(member.timeout(None, reason=flags.reason), type(None))], - ) - - -async def setup(bot: Tux) -> None: - await bot.add_cog(Untimeout(bot)) diff --git a/tux/cogs/moderation/warn.py b/tux/cogs/moderation/warn.py deleted file mode 100644 index 6bbee6470..000000000 --- a/tux/cogs/moderation/warn.py +++ /dev/null @@ -1,63 +0,0 @@ -import discord -from discord.ext import commands - -from prisma.enums import CaseType -from tux.bot import Tux -from tux.utils import checks -from tux.utils.flags import WarnFlags -from tux.utils.functions import generate_usage - -from . import ModerationCogBase - - -class Warn(ModerationCogBase): - def __init__(self, bot: Tux) -> None: - super().__init__(bot) - self.warn.usage = generate_usage(self.warn, WarnFlags) - - @commands.hybrid_command( - name="warn", - aliases=["w"], - ) - @commands.guild_only() - @checks.has_pl(2) - async def warn( - self, - ctx: commands.Context[Tux], - member: discord.Member, - *, - flags: WarnFlags, - ) -> None: - """ - Warn a member from the server. - - Parameters - ---------- - ctx : commands.Context[Tux] - The context in which the command is being invoked. - member : discord.Member - The member to warn. - flags : WarnFlags - The flags for the command. (reason: str, silent: bool) - """ - assert ctx.guild - - # Check if moderator has permission to warn the member - if not await self.check_conditions(ctx, member, ctx.author, "warn"): - return - - # Execute warn with case creation and DM - await self.execute_mod_action( - ctx=ctx, - case_type=CaseType.WARN, - user=member, - reason=flags.reason, - silent=flags.silent, - dm_action="warned", - # Use dummy coroutine for actions that don't need Discord API calls - actions=[(self._dummy_action(), type(None))], - ) - - -async def setup(bot: Tux) -> None: - await bot.add_cog(Warn(bot)) diff --git a/tux/database/controllers/base.py b/tux/database/controllers/base.py index f407e480d..d6e38f658 100644 --- a/tux/database/controllers/base.py +++ b/tux/database/controllers/base.py @@ -116,9 +116,24 @@ async def _execute_query( span.set_status("ok") return result # noqa: TRY300 except Exception as e: + # Attempt one reconnect + retry when connection lost + if "Can't reach database server" in str(e): + from tux.database.client import db + logger.warning("DB connection lost; attempting to reconnect once and retry the query.") + try: + if not db.is_connected(): + await db.connect() + result = await operation() + span.set_status("ok") + return result + except Exception: + pass span.set_status("internal_error") span.set_data("error", str(e)) - logger.error(f"{error_msg}: {e}") + if "Can't reach database server" in str(e): + logger.error(f"Database connection error: {e}") + else: + logger.error(f"{error_msg}: {e}") raise else: try: diff --git a/tux/database/controllers/guild_config.py b/tux/database/controllers/guild_config.py index 5acda6552..fc198afde 100644 --- a/tux/database/controllers/guild_config.py +++ b/tux/database/controllers/guild_config.py @@ -36,7 +36,12 @@ async def get_guild_config(self, guild_id: int) -> Any: async def get_guild_prefix(self, guild_id: int) -> str | None: """Get a guild prefix from the database.""" config: Any = await self.table.find_first(where={"guild_id": guild_id}) - return None if config is None else config.prefix + if config is None: + # Guild or config missing → create them lazily with default values + await self.ensure_guild_exists(guild_id) + config = await self.insert_guild_config(guild_id) + return config.prefix # default set by Prisma schema default + return config.prefix async def get_log_channel(self, guild_id: int, log_type: str) -> int | None: log_channel_ids: dict[str, GuildConfigScalarFieldKeys] = { diff --git a/tux/handlers/guild_init.py b/tux/handlers/guild_init.py new file mode 100644 index 000000000..4a524e312 --- /dev/null +++ b/tux/handlers/guild_init.py @@ -0,0 +1,36 @@ +"""Ensure guild + guild_config rows exist when the bot starts or joins a guild.""" +from __future__ import annotations + +import discord +from discord.ext import commands + +from tux.bot import Tux +from tux.database.controllers.guild import GuildController +from tux.database.controllers.guild_config import GuildConfigController + + +class GuildInit(commands.Cog): + def __init__(self, bot: Tux): + self.bot = bot + self.guild_controller = GuildController() + self.guild_config_controller = GuildConfigController() + + async def cog_load(self) -> None: # called by discord.py 2.4+ + # Ensure DB rows for all guilds we are already in + for guild in self.bot.guilds: + await self._ensure_guild(guild.id) + + async def _ensure_guild(self, guild_id: int) -> None: + # Upsert guild and guild_config rows + await self.guild_controller.get_or_create_guild(guild_id) + if await self.guild_config_controller.get_guild_config(guild_id) is None: + await self.guild_config_controller.insert_guild_config(guild_id) + + # When the bot is added to a new guild + @commands.Cog.listener() + async def on_guild_join(self, guild: discord.Guild) -> None: + await self._ensure_guild(guild.id) + + +async def setup(bot: Tux) -> None: + await bot.add_cog(GuildInit(bot)) \ No newline at end of file diff --git a/tux/help.py b/tux/help.py index 619907dbe..a30b614c3 100644 --- a/tux/help.py +++ b/tux/help.py @@ -181,18 +181,40 @@ def _format_flag_details(self, command: commands.Command[Any, Any, Any]) -> str: except Exception: return "" + from typing import get_origin, get_args # noqa: WPS433 + for param_annotation in type_hints.values(): - if not isinstance(param_annotation, type) or not issubclass(param_annotation, commands.FlagConverter): - continue - - for flag in param_annotation.__commands_flags__.values(): - flag_str = self._format_flag_name(flag) - if flag.aliases and not getattr(flag, "positional", False): - flag_str += f" ({', '.join(flag.aliases)})" - flag_str += f"\n\t{flag.description or 'No description provided'}" - if flag.default is not discord.utils.MISSING: - flag_str += f"\n\tDefault: {flag.default}" - flag_details.append(flag_str) + origin = get_origin(param_annotation) + args = get_args(param_annotation) + + candidates: list[type] = [] + + # Direct FlagConverter subclass + if isinstance(param_annotation, type): + candidates.append(param_annotation) + + # Union | Optional[...] etc. + if origin in {getattr(__import__('types'), 'UnionType', None), None, list, dict, tuple, set}: # placeholder check + # Not needed + pass + + if origin is not None and args: + for arg in args: + if isinstance(arg, type): + candidates.append(arg) + + for cand in candidates: + if not issubclass(cand, commands.FlagConverter): + continue + + for flag in cand.__commands_flags__.values(): + flag_str = self._format_flag_name(flag) + if flag.aliases and not getattr(flag, "positional", False): + flag_str += f" ({', '.join(flag.aliases)})" + flag_str += f"\n\t{flag.description or 'No description provided'}" + if flag.default is not discord.utils.MISSING: + flag_str += f"\n\tDefault: {flag.default}" + flag_details.append(flag_str) return "\n\n".join(flag_details) diff --git a/tux/utils/flag_factory.py b/tux/utils/flag_factory.py new file mode 100644 index 000000000..015ce4f5c --- /dev/null +++ b/tux/utils/flag_factory.py @@ -0,0 +1,81 @@ +"""Utilities for generating discord.py FlagConverter classes at runtime. + +This supports the dynamic moderation command system: given a command +configuration, generate a `commands.FlagConverter` subclass that exposes the +expected flags (duration, purge, silent, etc.). This allows discord.py's +native flag parsing to work for both prefix and slash commands *and* lets the +custom help command auto-document the available flags. +""" +from __future__ import annotations + +from typing import Any, Dict + +import discord +from discord.ext import commands + +__all__ = ["build_flag_converter"] + + +def _add_flag(attrs: Dict[str, Any], name: str, **kwargs: Any) -> None: + """Helper to add a commands.flag entry to *attrs* if not already present.""" + if name not in attrs: + attrs[name] = commands.flag(**kwargs) + + +def build_flag_converter(identifier: str, *, duration: bool, purge: bool, silent: bool) -> type[commands.FlagConverter] | None: # noqa: D401 + """Return a `FlagConverter` subclass for the given flags. + + Parameters + ---------- + identifier: + A unique string (command name) used to build the class name. + duration / purge / silent: + Booleans indicating whether those flags are enabled. + + Returns + ------- + type[commands.FlagConverter] | None + The dynamically created converter class, or *None* if no flags were + requested. + """ + attrs: Dict[str, Any] = {} + + if duration: + _add_flag( + attrs, + "duration", + description="Duration (e.g. 14d)", + aliases=["d"], + default=None, + positional=True, + ) + + if purge: + _add_flag( + attrs, + "purge", + description="Days of messages to delete (0-7)", + aliases=["p"], + default=0, + ) + + if silent: + _add_flag( + attrs, + "silent", + description="Don't DM the target", + aliases=["s", "quiet"], + default=False, + ) + + # No flags → no converter needed + if not attrs: + return None + + cls_name = f"{identifier.title()}Flags" + FlagsCls = type(cls_name, (commands.FlagConverter,), attrs) + + # Expose in module globals so `typing.get_type_hints` can resolve it when + # the help command introspects the callback signature. + globals()[cls_name] = FlagsCls + return FlagsCls \ No newline at end of file diff --git a/tux/utils/transformers.py b/tux/utils/transformers.py new file mode 100644 index 000000000..344fecac0 --- /dev/null +++ b/tux/utils/transformers.py @@ -0,0 +1,58 @@ +"""Utility transformers and converters for Discord slash + prefix commands.""" +from __future__ import annotations + +import discord +from discord import app_commands +from discord.ext import commands + +__all__ = ["MemberOrUser"] + +class MemberOrUser(app_commands.Transformer, commands.Converter): + """Parameter type that resolves to *Member* if possible, else *User*. + + Works for both prefix (`commands.Converter`) and slash + (`app_commands.Transformer`) invocations so a single annotation can be + used in `@commands.hybrid_command` definitions. + """ + + # --- app_commands.Transformer protocol --------------------------------- + # Slash option type must be declared as a class attribute; older d.py exposes it on the *discord* module. + type: discord.AppCommandOptionType = discord.AppCommandOptionType.user # noqa: D401 + + async def transform( + self, + interaction: discord.Interaction, + value: discord.User, # Discord supplies a User from the USER option + ) -> discord.User | discord.Member: # pragma: no cover + if interaction.guild: + # Try cached member first + member = interaction.guild.get_member(value.id) + if member is None: + try: + member = await interaction.guild.fetch_member(value.id) + except discord.HTTPException: + member = None + return member or value + return value + + # --- commands.Converter protocol -------------------------------------- + async def convert( + self, + ctx: commands.Context, + argument: str | discord.Object | discord.User | discord.Member, + ) -> discord.User | discord.Member: # pragma: no cover + # Let the built-in User converter do the heavy lifting + try: + user: discord.User | discord.Member = await commands.UserConverter().convert(ctx, str(argument)) + except commands.BadArgument as exc: + raise exc + + if isinstance(ctx.guild, discord.Guild): + member = ctx.guild.get_member(user.id) + if member is None: + try: + member = await ctx.guild.fetch_member(user.id) + except discord.HTTPException: + member = None + return member or user + return user \ No newline at end of file