Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import quart

from .. import group
from ... import group


@group.group_class('pipelines', '/api/v1/pipelines')
Expand Down
79 changes: 79 additions & 0 deletions pkg/api/http/controller/groups/pipelines/webchat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import quart

from ... import group


@group.group_class('webchat', '/api/v1/pipelines/<pipeline_uuid>/chat')
class WebChatDebugRouterGroup(group.RouterGroup):
async def initialize(self) -> None:
@self.route('/send', methods=['POST'])
async def send_message(pipeline_uuid: str) -> str:
"""发送调试消息到流水线"""
try:
data = await quart.request.get_json()
session_type = data.get('session_type', 'person')
message_chain_obj = data.get('message', [])

if not message_chain_obj:
return self.http_status(400, -1, 'message is required')

if session_type not in ['person', 'group']:
return self.http_status(400, -1, 'session_type must be person or group')

webchat_adapter = self.ap.platform_mgr.webchat_proxy_bot.adapter

if not webchat_adapter:
return self.http_status(404, -1, 'WebChat adapter not found')

result = await webchat_adapter.send_webchat_message(pipeline_uuid, session_type, message_chain_obj)

return self.success(
data={
'message': result,
}
)

except Exception as e:
return self.http_status(500, -1, f'Internal server error: {str(e)}')

@self.route('/messages/<session_type>', methods=['GET'])
async def get_messages(pipeline_uuid: str, session_type: str) -> str:
"""获取调试消息历史"""
try:
if session_type not in ['person', 'group']:
return self.http_status(400, -1, 'session_type must be person or group')

webchat_adapter = self.ap.platform_mgr.webchat_proxy_bot.adapter

if not webchat_adapter:
return self.http_status(404, -1, 'WebChat adapter not found')

messages = webchat_adapter.get_webchat_messages(pipeline_uuid, session_type)

return self.success(data={'messages': messages})

except Exception as e:
return self.http_status(500, -1, f'Internal server error: {str(e)}')

@self.route('/reset/<session_type>', methods=['POST'])
async def reset_session(session_type: str) -> str:
"""重置调试会话"""
try:
if session_type not in ['person', 'group']:
return self.http_status(400, -1, 'session_type must be person or group')

webchat_adapter = None
for bot in self.ap.platform_mgr.bots:
if hasattr(bot.adapter, '__class__') and bot.adapter.__class__.__name__ == 'WebChatAdapter':
webchat_adapter = bot.adapter
break

if not webchat_adapter:
return self.http_status(404, -1, 'WebChat adapter not found')

webchat_adapter.reset_debug_session(session_type)

return self.success(data={'message': 'Session reset successfully'})

except Exception as e:
return self.http_status(500, -1, f'Internal server error: {str(e)}')
2 changes: 2 additions & 0 deletions pkg/api/http/controller/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
from . import group
from .groups import provider as groups_provider
from .groups import platform as groups_platform
from .groups import pipelines as groups_pipelines

importutil.import_modules_in_pkg(groups)
importutil.import_modules_in_pkg(groups_provider)
importutil.import_modules_in_pkg(groups_platform)
importutil.import_modules_in_pkg(groups_pipelines)


class HTTPController:
Expand Down
5 changes: 4 additions & 1 deletion pkg/persistence/mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ async def create_tables(self):

# write default pipeline
result = await self.execute_async(sqlalchemy.select(pipeline.LegacyPipeline))
default_pipeline_uuid = None
if result.first() is None:
self.ap.logger.info('Creating default pipeline...')

pipeline_config = json.load(open('templates/default-pipeline-config.json', 'r', encoding='utf-8'))

default_pipeline_uuid = str(uuid.uuid4())
pipeline_data = {
'uuid': str(uuid.uuid4()),
'uuid': default_pipeline_uuid,
'for_version': self.ap.ver_mgr.get_current_version(),
'stages': pipeline_service.default_stage_order,
'is_default': True,
Expand All @@ -82,6 +84,7 @@ async def create_tables(self):
}

await self.execute_async(sqlalchemy.insert(pipeline.LegacyPipeline).values(pipeline_data))

# =================================

# run migrations
Expand Down
9 changes: 4 additions & 5 deletions pkg/pipeline/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ async def _process_query(selected_query: entities.Query):
# find pipeline
# Here firstly find the bot, then find the pipeline, in case the bot adapter's config is not the latest one.
# Like aiocqhttp, once a client is connected, even the adapter was updated and restarted, the existing client connection will not be affected.
bot = await self.ap.platform_mgr.get_bot_by_uuid(selected_query.bot_uuid)
if bot:
pipeline = await self.ap.pipeline_mgr.get_pipeline_by_uuid(
bot.bot_entity.use_pipeline_uuid
)
pipeline_uuid = selected_query.pipeline_uuid

if pipeline_uuid:
pipeline = await self.ap.pipeline_mgr.get_pipeline_by_uuid(pipeline_uuid)
if pipeline:
await pipeline.run(selected_query)

Expand Down
2 changes: 2 additions & 0 deletions pkg/pipeline/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ async def add_query(
message_event: platform_events.MessageEvent,
message_chain: platform_message.MessageChain,
adapter: msadapter.MessagePlatformAdapter,
pipeline_uuid: typing.Optional[str] = None,
) -> entities.Query:
async with self.condition:
query = entities.Query(
Expand All @@ -48,6 +49,7 @@ async def add_query(
resp_messages=[],
resp_message_chain=[],
adapter=adapter,
pipeline_uuid=pipeline_uuid,
)
self.queries.append(query)
self.query_id_counter += 1
Expand Down
36 changes: 34 additions & 2 deletions pkg/platform/botmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import traceback
import sqlalchemy


# FriendMessage, Image, MessageChain, Plain
from . import adapter as msadapter

Expand Down Expand Up @@ -78,6 +77,7 @@ async def on_friend_message(
message_event=event,
message_chain=event.message_chain,
adapter=adapter,
pipeline_uuid=self.bot_entity.use_pipeline_uuid,
)

async def on_group_message(
Expand All @@ -102,6 +102,7 @@ async def on_group_message(
message_event=event,
message_chain=event.message_chain,
adapter=adapter,
pipeline_uuid=self.bot_entity.use_pipeline_uuid,
)

self.adapter.register_listener(platform_events.FriendMessage, on_friend_message)
Expand Down Expand Up @@ -144,6 +145,8 @@ class PlatformManager:

bots: list[RuntimeBot]

webchat_proxy_bot: RuntimeBot

adapter_components: list[engine.Component]

adapter_dict: dict[str, type[msadapter.MessagePlatformAdapter]]
Expand All @@ -161,6 +164,31 @@ async def initialize(self):
adapter_dict[component.metadata.name] = component.get_python_component_class()
self.adapter_dict = adapter_dict

webchat_adapter_class = self.adapter_dict['webchat']

# initialize webchat adapter
webchat_logger = EventLogger(name='webchat-adapter', ap=self.ap)
webchat_adapter_inst = webchat_adapter_class(
{},
self.ap,
webchat_logger,
)

self.webchat_proxy_bot = RuntimeBot(
ap=self.ap,
bot_entity=persistence_bot.Bot(
uuid='webchat-proxy-bot',
name='WebChat',
description='',
adapter='webchat',
adapter_config={},
enable=True,
),
adapter=webchat_adapter_inst,
logger=webchat_logger,
)
await self.webchat_proxy_bot.initialize()

await self.load_bots_from_db()

def get_running_adapters(self) -> list[msadapter.MessagePlatformAdapter]:
Expand Down Expand Up @@ -220,7 +248,9 @@ async def remove_bot(self, bot_uuid: str):
return

def get_available_adapters_info(self) -> list[dict]:
return [component.to_plain_dict() for component in self.adapter_components]
return [
component.to_plain_dict() for component in self.adapter_components if component.metadata.name != 'webchat'
]

def get_available_adapter_info_by_name(self, name: str) -> dict | None:
for component in self.adapter_components:
Expand Down Expand Up @@ -273,6 +303,8 @@ async def write_back_config(

async def run(self):
# This method will only be called when the application launching
await self.webchat_proxy_bot.run()

for bot in self.bots:
if bot.enable:
await bot.run()
Expand Down
Loading
Loading