From 13e1fd4e1ec9aaf95abb396c0b0b21ecd1a31fd7 Mon Sep 17 00:00:00 2001 From: kzndotsh Date: Sun, 20 Jul 2025 00:22:06 -0400 Subject: [PATCH] feat: enhance service cogs with improved functionality - Add new reminders service - Update levels service integration - Improve influxdb logging capabilities - Enhance gif limiter and status roles --- tux/cogs/services/gif_limiter.py | 20 +++- tux/cogs/services/influxdblogger.py | 109 +++++++++++++++----- tux/cogs/services/levels.py | 48 +++++---- tux/cogs/services/reminders.py | 151 ++++++++++++++++++++++++++++ tux/cogs/services/status_roles.py | 24 ++--- 5 files changed, 289 insertions(+), 63 deletions(-) create mode 100644 tux/cogs/services/reminders.py diff --git a/tux/cogs/services/gif_limiter.py b/tux/cogs/services/gif_limiter.py index b9f7a694d..9aa1d989d 100644 --- a/tux/cogs/services/gif_limiter.py +++ b/tux/cogs/services/gif_limiter.py @@ -4,6 +4,7 @@ import discord from discord.ext import commands, tasks +from loguru import logger from tux.bot import Tux from tux.utils.config import CONFIG @@ -120,7 +121,7 @@ async def on_message(self, message: discord.Message) -> None: if await self._should_process_message(message): await self._handle_gif_message(message) - @tasks.loop(seconds=20) + @tasks.loop(seconds=20, name="old_gif_remover") async def old_gif_remover(self) -> None: """ Regularly cleans old GIF timestamps @@ -143,6 +144,23 @@ async def old_gif_remover(self) -> None: else: del self.recent_gifs_by_user[user_id] + @old_gif_remover.before_loop + async def before_old_gif_remover(self) -> None: + """Wait until the bot is ready.""" + await self.bot.wait_until_ready() + + @old_gif_remover.error + async def on_old_gif_remover_error(self, error: BaseException) -> None: + """Handles errors in the old_gif_remover loop.""" + logger.error(f"Error in old_gif_remover loop: {error}") + + if isinstance(error, Exception): + self.bot.sentry_manager.capture_exception(error) + else: + # For BaseExceptions that are not Exceptions (like KeyboardInterrupt), + # it's often better to let them propagate. + raise error + async def cog_unload(self) -> None: """Cancel the background task when the cog is unloaded.""" self.old_gif_remover.cancel() diff --git a/tux/cogs/services/influxdblogger.py b/tux/cogs/services/influxdblogger.py index fada085b5..863a20ec1 100644 --- a/tux/cogs/services/influxdblogger.py +++ b/tux/cogs/services/influxdblogger.py @@ -1,5 +1,6 @@ from typing import Any +import discord from discord.ext import commands, tasks from influxdb_client.client.influxdb_client import InfluxDBClient from influxdb_client.client.write.point import Point @@ -12,13 +13,14 @@ class InfluxLogger(commands.Cog): - def __init__(self, bot: Tux): + def __init__(self, bot: Tux) -> None: self.bot = bot self.db = DatabaseController() self.influx_write_api: Any | None = None self.influx_org: str = "" if self.init_influx(): + self._log_guild_stats.start() self.logger.start() else: logger.warning("InfluxDB logger failed to init. Check .env configuration if you want to use it.") @@ -42,7 +44,46 @@ def init_influx(self) -> bool: return True return False - @tasks.loop(seconds=60) + @tasks.loop(seconds=60, name="influx_guild_stats") + async def _log_guild_stats(self) -> None: + """Logs guild statistics to InfluxDB.""" + if not self.bot.is_ready() or not self.influx_write_api: + logger.debug("Bot not ready or InfluxDB writer not initialized, skipping InfluxDB logging.") + return + + for guild in self.bot.guilds: + online_members = sum(m.status != discord.Status.offline for m in guild.members) + + tags = {"guild": guild.name} + fields = { + "members": guild.member_count, + "online": online_members, + } + + point = {"measurement": "guild_stats", "tags": tags, "fields": fields} + + self.influx_write_api.write(bucket="tux_stats", org=self.influx_org, record=point) + + @_log_guild_stats.before_loop + async def before_log_guild_stats(self) -> None: + """Wait until the bot is ready.""" + await self.bot.wait_until_ready() + + @_log_guild_stats.error + async def on_log_guild_stats_error(self, error: BaseException) -> None: + """Handles errors in the guild stats logging loop.""" + logger.error(f"Error in InfluxDB guild stats logger loop: {error}") + if isinstance(error, Exception): + self.bot.sentry_manager.capture_exception(error) + else: + raise error + + async def cog_unload(self) -> None: + if self.influx_write_api: + self._log_guild_stats.cancel() + self.logger.cancel() + + @tasks.loop(seconds=60, name="influx_db_logger") async def logger(self) -> None: """Log statistics to InfluxDB at regular intervals. @@ -55,40 +96,54 @@ async def logger(self) -> None: influx_bucket = "tux stats" # Collect the guild list from the database - try: - guild_list = await self.db.guild.find_many(where={}) + guild_list = await self.db.guild.find_many(where={}) + + # Iterate through each guild and collect metrics + for guild in guild_list: + if not guild.guild_id: + continue - # Iterate through each guild and collect metrics - for guild in guild_list: - if not guild.guild_id: - continue + guild_id = int(guild.guild_id) - guild_id = int(guild.guild_id) + # Collect data by querying controllers + starboard_stats = await self.db.starboard_message.find_many(where={"message_guild_id": guild_id}) - # Collect data by querying controllers - starboard_stats = await self.db.starboard_message.find_many(where={"message_guild_id": guild_id}) + snippet_stats = await self.db.snippet.find_many(where={"guild_id": guild_id}) - snippet_stats = await self.db.snippet.find_many(where={"guild_id": guild_id}) + afk_stats = await self.db.afk.find_many(where={"guild_id": guild_id}) - afk_stats = await self.db.afk.find_many(where={"guild_id": guild_id}) + case_stats = await self.db.case.find_many(where={"guild_id": guild_id}) - case_stats = await self.db.case.find_many(where={"guild_id": guild_id}) + # Create data points with type ignores for InfluxDB methods + # The InfluxDB client's type hints are incomplete + points: list[Point] = [ + Point("guild stats").tag("guild", guild_id).field("starboard count", len(starboard_stats)), # type: ignore + Point("guild stats").tag("guild", guild_id).field("snippet count", len(snippet_stats)), # type: ignore + Point("guild stats").tag("guild", guild_id).field("afk count", len(afk_stats)), # type: ignore + Point("guild stats").tag("guild", guild_id).field("case count", len(case_stats)), # type: ignore + ] - # Create data points with type ignores for InfluxDB methods - # The InfluxDB client's type hints are incomplete - points: list[Point] = [ - Point("guild stats").tag("guild", guild_id).field("starboard count", len(starboard_stats)), # type: ignore - Point("guild stats").tag("guild", guild_id).field("snippet count", len(snippet_stats)), # type: ignore - Point("guild stats").tag("guild", guild_id).field("afk count", len(afk_stats)), # type: ignore - Point("guild stats").tag("guild", guild_id).field("case count", len(case_stats)), # type: ignore - ] + # Write to InfluxDB + self.influx_write_api.write(bucket=influx_bucket, org=self.influx_org, record=points) - # Write to InfluxDB - self.influx_write_api.write(bucket=influx_bucket, org=self.influx_org, record=points) + @logger.before_loop + async def before_logger(self) -> None: + """Wait until the bot is ready.""" + await self.bot.wait_until_ready() - except Exception as e: - logger.error(f"Error collecting metrics for InfluxDB: {e}") + @logger.error + async def on_logger_error(self, error: BaseException) -> None: + """Handles errors in the logger loop.""" + logger.error(f"Error in InfluxDB logger loop: {error}") + if isinstance(error, Exception): + self.bot.sentry_manager.capture_exception(error) + else: + raise error async def setup(bot: Tux) -> None: - await bot.add_cog(InfluxLogger(bot)) + # Only load the cog if InfluxDB configuration is available + if all([CONFIG.INFLUXDB_TOKEN, CONFIG.INFLUXDB_URL, CONFIG.INFLUXDB_ORG]): + await bot.add_cog(InfluxLogger(bot)) + else: + logger.warning("InfluxDB configuration incomplete, skipping InfluxLogger cog") diff --git a/tux/cogs/services/levels.py b/tux/cogs/services/levels.py index 2f0b25ca5..c0b3f0fb6 100644 --- a/tux/cogs/services/levels.py +++ b/tux/cogs/services/levels.py @@ -5,7 +5,7 @@ from discord.ext import commands from loguru import logger -from tux.app import get_prefix +from prisma.models import Levels from tux.bot import Tux from tux.database.controllers import DatabaseController from tux.ui.embeds import EmbedCreator @@ -33,20 +33,40 @@ async def xp_listener(self, message: discord.Message) -> None: message : discord.Message The message object. """ - if message.author.bot or message.guild is None or message.channel.id in CONFIG.XP_BLACKLIST_CHANNELS: + if message.author.bot or not message.guild or message.channel.id in CONFIG.XP_BLACKLIST_CHANNELS: return - prefixes = await get_prefix(self.bot, message) - if any(message.content.startswith(prefix) for prefix in prefixes): + # Ignore messages that are commands + ctx = await self.bot.get_context(message) + if ctx.valid: return + # Fetch member object member = message.guild.get_member(message.author.id) - if member is None: + if not member: return - await self.process_xp_gain(member, message.guild) + # Fetch all user level data in one query to minimize DB calls + user_level_data = await self.db.levels.get_user_level_data(member.id, message.guild.id) - async def process_xp_gain(self, member: discord.Member, guild: discord.Guild) -> None: + # Check if the user is blacklisted + if user_level_data and user_level_data.blacklisted: + return + + # Check if the user is on cooldown + last_message_time = user_level_data.last_message if user_level_data else None + if last_message_time and self.is_on_cooldown(last_message_time): + return + + # Process XP gain with the already fetched data + await self.process_xp_gain(member, message.guild, user_level_data) + + async def process_xp_gain( + self, + member: discord.Member, + guild: discord.Guild, + user_level_data: Levels | None, + ) -> None: """ Processes XP gain for a member. @@ -56,17 +76,11 @@ async def process_xp_gain(self, member: discord.Member, guild: discord.Guild) -> The member gaining XP. guild : discord.Guild The guild where the member is gaining XP. + user_level_data : Levels | None + The existing level data for the user. """ - # Get blacklist status - is_blacklisted = await self.db.levels.is_blacklisted(member.id, guild.id) - if is_blacklisted: - return - - last_message_time = await self.db.levels.get_last_message_time(member.id, guild.id) - if last_message_time and self.is_on_cooldown(last_message_time): - return - - current_xp, current_level = await self.db.levels.get_xp_and_level(member.id, guild.id) + current_xp = user_level_data.xp if user_level_data else 0.0 + current_level = user_level_data.level if user_level_data else 0 xp_increment = self.calculate_xp_increment(member) new_xp = current_xp + xp_increment diff --git a/tux/cogs/services/reminders.py b/tux/cogs/services/reminders.py new file mode 100644 index 000000000..5442f6e17 --- /dev/null +++ b/tux/cogs/services/reminders.py @@ -0,0 +1,151 @@ +import asyncio +import contextlib +import datetime +import heapq +from collections.abc import Coroutine +from typing import Any, NamedTuple + +import discord +from discord.ext import commands, tasks +from loguru import logger + +from prisma.models import Reminder +from tux.bot import Tux +from tux.database.controllers import DatabaseController +from tux.ui.embeds import EmbedCreator + + +class ScheduledReminder(NamedTuple): + """A scheduled reminder entry for the priority queue.""" + + timestamp: float + reminder: Reminder + + +class ReminderService(commands.Cog): + def __init__(self, bot: Tux) -> None: + self.bot = bot + self.db = DatabaseController() + self._initialized = False + # Use a heap as a priority queue for reminders + self._reminder_queue: list[ScheduledReminder] = [] + self._queue_lock = asyncio.Lock() + # Store task references to prevent garbage collection + self._reminder_tasks: set[asyncio.Task[Any]] = set() + self.reminder_processor.start() + + def _create_reminder_task(self, coro: Coroutine[Any, Any, Any], name: str) -> None: + """Create a task and store its reference to prevent garbage collection.""" + task = asyncio.create_task(coro, name=name) + self._reminder_tasks.add(task) + task.add_done_callback(self._reminder_tasks.discard) + + async def cog_unload(self) -> None: + """Clean up resources when the cog is unloaded.""" + self.reminder_processor.cancel() + # Cancel any pending reminder tasks + for task in self._reminder_tasks.copy(): + task.cancel() + await asyncio.gather(*self._reminder_tasks, return_exceptions=True) + self._reminder_tasks.clear() + + @tasks.loop(seconds=5, name="reminder_processor") + async def reminder_processor(self) -> None: + """Process reminders from the priority queue.""" + current_time = datetime.datetime.now(datetime.UTC).timestamp() + + async with self._queue_lock: + # Process all reminders that are due + while self._reminder_queue and self._reminder_queue[0].timestamp <= current_time: + scheduled_reminder = heapq.heappop(self._reminder_queue) + # Schedule the reminder sending as a separate task to avoid blocking the loop + self._create_reminder_task( + self.send_reminder(scheduled_reminder.reminder), + f"send_reminder_{scheduled_reminder.reminder.reminder_id}", + ) + + @reminder_processor.before_loop + async def before_reminder_processor(self) -> None: + """Wait until the bot is ready.""" + await self.bot.wait_until_ready() + + @reminder_processor.error + async def on_reminder_processor_error(self, error: BaseException) -> None: + """Handles errors in the reminder processor loop.""" + logger.error(f"Error in reminder processor loop: {error}") + if isinstance(error, Exception): + self.bot.sentry_manager.capture_exception(error) + else: + raise error + + async def schedule_reminder(self, reminder: Reminder) -> None: + """Add a reminder to the priority queue.""" + scheduled = ScheduledReminder(timestamp=reminder.reminder_expires_at.timestamp(), reminder=reminder) + + async with self._queue_lock: + heapq.heappush(self._reminder_queue, scheduled) + + async def send_reminder(self, reminder: Reminder) -> None: + user = self.bot.get_user(reminder.reminder_user_id) + if user is not None: + embed = EmbedCreator.create_embed( + bot=self.bot, + embed_type=EmbedCreator.INFO, + user_name=user.name, + user_display_avatar=user.display_avatar.url, + title="Reminder", + description=reminder.reminder_content, + ) + + try: + await user.send(embed=embed) + + except discord.Forbidden: + channel = self.bot.get_channel(reminder.reminder_channel_id) + + if isinstance(channel, discord.TextChannel | discord.Thread | discord.VoiceChannel): + with contextlib.suppress(discord.Forbidden): + await channel.send( + content=f"{user.mention} Failed to DM you, sending in channel", + embed=embed, + ) + return + + else: + logger.error( + f"Failed to send reminder {reminder.reminder_id}, DMs closed and channel not found.", + ) + + else: + logger.error( + f"Failed to send reminder {reminder.reminder_id}, user with ID {reminder.reminder_user_id} not found.", + ) + + try: + await self.db.reminder.delete_reminder_by_id(reminder.reminder_id) + except Exception as e: + logger.error(f"Failed to delete reminder: {e}") + + @commands.Cog.listener() + async def on_ready(self) -> None: + if self._initialized: + return + + self._initialized = True + + reminders = await self.db.reminder.get_all_reminders() + dt_now = datetime.datetime.now(datetime.UTC) + + for reminder in reminders: + if reminder.reminder_expires_at <= dt_now: + # Send expired reminders immediately + self._create_reminder_task(self.send_reminder(reminder), f"expired_reminder_{reminder.reminder_id}") + else: + # Schedule future reminders + await self.schedule_reminder(reminder) + + logger.info(f"Loaded {len(reminders)} existing reminders into queue") + + +async def setup(bot: Tux) -> None: + await bot.add_cog(ReminderService(bot)) diff --git a/tux/cogs/services/status_roles.py b/tux/cogs/services/status_roles.py index a03969660..4c6b60bd2 100644 --- a/tux/cogs/services/status_roles.py +++ b/tux/cogs/services/status_roles.py @@ -1,4 +1,3 @@ -import asyncio import re import discord @@ -14,23 +13,7 @@ class StatusRoles(commands.Cog): def __init__(self, bot: commands.Bot): self.bot = bot self.status_roles = CONFIG.STATUS_ROLES - self._unload_task = None # Store task reference here - - # Check if config exists and is valid - if not self.status_roles: - logger.warning("No status roles configurations found. Unloading StatusRoles cog.") - # Store the task reference - self._unload_task = asyncio.create_task(self._unload_self()) - else: - logger.info(f"StatusRoles cog initialized with {len(self.status_roles)} role configurations") - - async def _unload_self(self): - """Unload this cog if configuration is missing.""" - try: - await self.bot.unload_extension("tux.cogs.services.status_roles") - logger.info("StatusRoles cog has been unloaded due to missing configuration") - except Exception as e: - logger.error(f"Failed to unload StatusRoles cog: {e}") + logger.info(f"StatusRoles cog initialized with {len(self.status_roles)} role configurations") @commands.Cog.listener() async def on_ready(self): @@ -126,4 +109,9 @@ async def check_and_update_roles(self, member: discord.Member): async def setup(bot: commands.Bot): + # Check if config exists and is valid before loading the cog + if not CONFIG.STATUS_ROLES: + logger.warning("No status roles configurations found. Skipping StatusRoles cog.") + return + await bot.add_cog(StatusRoles(bot))