From e76db105b6d1eb8a07675c05a573a67f510417a4 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Wed, 27 Sep 2023 11:40:19 +0100 Subject: [PATCH 1/3] Change handling of Murfey DB connections in server websocket code --- src/murfey/server/websocket.py | 41 ++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/src/murfey/server/websocket.py b/src/murfey/server/websocket.py index 614b51396..8ffc61d9e 100644 --- a/src/murfey/server/websocket.py +++ b/src/murfey/server/websocket.py @@ -7,9 +7,9 @@ from typing import Any, Dict, Generic, TypeVar from fastapi import APIRouter, WebSocket, WebSocketDisconnect -from sqlmodel import select +from sqlmodel import Session, create_engine, select -from murfey.server.murfey_db import get_murfey_db_session +from murfey.server.murfey_db import get_murfey_db_session, url from murfey.util.db import ClientEnvironment from murfey.util.state import State, global_state @@ -18,6 +18,8 @@ ws = APIRouter(prefix="/ws", tags=["websocket"]) log = logging.getLogger("murfey.server.websocket") +engine = create_engine(url()) + class ConnectionManager(Generic[T]): def __init__(self, state: State[T]): @@ -35,19 +37,20 @@ async def connect(self, websocket: WebSocket, client_id: int): def _register_new_client(client_id: int): new_client = ClientEnvironment(client_id=client_id, connected=True) murfey_db = next(get_murfey_db_session()) - murfey_db.add(new_client) - murfey_db.commit() - murfey_db.close() + with Session(engine) as murfey_db: + murfey_db.add(new_client) + murfey_db.commit() def disconnect(self, websocket: WebSocket, client_id: int): self.active_connections.pop(client_id) - murfey_db = next(get_murfey_db_session()) - client_env = murfey_db.exec( - select(ClientEnvironment).where(ClientEnvironment.client_id == client_id) - ).one() - murfey_db.delete(client_env) - murfey_db.commit() - murfey_db.close() + with Session(engine) as murfey_db: + client_env = murfey_db.exec( + select(ClientEnvironment).where( + ClientEnvironment.client_id == client_id + ) + ).one() + murfey_db.delete(client_env) + murfey_db.commit() async def broadcast(self, message: str): for connection in self.active_connections: @@ -120,13 +123,13 @@ async def forward_log(logrecord: dict[str, Any], websocket: WebSocket): @ws.delete("/test/{client_id}") async def close_ws_connection(client_id: int): murfey_db = next(get_murfey_db_session()) - client_env = murfey_db.exec( - select(ClientEnvironment).where(ClientEnvironment.client_id == client_id) - ).one() - client_env.connected = False - murfey_db.add(client_env) - murfey_db.commit() - murfey_db.close() + with Session(engine) as murfey_db: + client_env = murfey_db.exec( + select(ClientEnvironment).where(ClientEnvironment.client_id == client_id) + ).one() + client_env.connected = False + murfey_db.add(client_env) + murfey_db.commit() client_id_str = str(client_id).replace("\r\n", "").replace("\n", "") log.info(f"Disconnecting {client_id_str}") manager.disconnect(manager.active_connections[client_id], client_id) From dea859cd76e96395e802ba021e586c9da0244c13 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Wed, 27 Sep 2023 14:49:24 +0100 Subject: [PATCH 2/3] Move DB session creation --- src/murfey/server/websocket.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/murfey/server/websocket.py b/src/murfey/server/websocket.py index 8ffc61d9e..f14769264 100644 --- a/src/murfey/server/websocket.py +++ b/src/murfey/server/websocket.py @@ -18,14 +18,13 @@ ws = APIRouter(prefix="/ws", tags=["websocket"]) log = logging.getLogger("murfey.server.websocket") -engine = create_engine(url()) - class ConnectionManager(Generic[T]): def __init__(self, state: State[T]): self.active_connections: Dict[int, WebSocket] = {} self._state = state self._state.subscribe(self._broadcast_state_update) + self._engine = create_engine(url()) async def connect(self, websocket: WebSocket, client_id: int): await websocket.accept() @@ -33,17 +32,16 @@ async def connect(self, websocket: WebSocket, client_id: int): self._register_new_client(client_id) await websocket.send_json({"message": "state-full", "state": self._state.data}) - @staticmethod - def _register_new_client(client_id: int): + def _register_new_client(self, client_id: int): new_client = ClientEnvironment(client_id=client_id, connected=True) murfey_db = next(get_murfey_db_session()) - with Session(engine) as murfey_db: + with Session(self._engine) as murfey_db: murfey_db.add(new_client) murfey_db.commit() def disconnect(self, websocket: WebSocket, client_id: int): self.active_connections.pop(client_id) - with Session(engine) as murfey_db: + with Session(self._engine) as murfey_db: client_env = murfey_db.exec( select(ClientEnvironment).where( ClientEnvironment.client_id == client_id @@ -122,6 +120,7 @@ async def forward_log(logrecord: dict[str, Any], websocket: WebSocket): @ws.delete("/test/{client_id}") async def close_ws_connection(client_id: int): + engine = create_engine(url()) murfey_db = next(get_murfey_db_session()) with Session(engine) as murfey_db: client_env = murfey_db.exec( From 033b8ea57b02d5e10b950101705d636a972a3733 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Wed, 27 Sep 2023 14:53:51 +0100 Subject: [PATCH 3/3] Bit more moving around to get tests to run --- src/murfey/server/websocket.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/murfey/server/websocket.py b/src/murfey/server/websocket.py index f14769264..73a565d94 100644 --- a/src/murfey/server/websocket.py +++ b/src/murfey/server/websocket.py @@ -24,7 +24,6 @@ def __init__(self, state: State[T]): self.active_connections: Dict[int, WebSocket] = {} self._state = state self._state.subscribe(self._broadcast_state_update) - self._engine = create_engine(url()) async def connect(self, websocket: WebSocket, client_id: int): await websocket.accept() @@ -33,15 +32,17 @@ async def connect(self, websocket: WebSocket, client_id: int): await websocket.send_json({"message": "state-full", "state": self._state.data}) def _register_new_client(self, client_id: int): + engine = create_engine(url()) new_client = ClientEnvironment(client_id=client_id, connected=True) murfey_db = next(get_murfey_db_session()) - with Session(self._engine) as murfey_db: + with Session(engine) as murfey_db: murfey_db.add(new_client) murfey_db.commit() def disconnect(self, websocket: WebSocket, client_id: int): + engine = create_engine(url()) self.active_connections.pop(client_id) - with Session(self._engine) as murfey_db: + with Session(engine) as murfey_db: client_env = murfey_db.exec( select(ClientEnvironment).where( ClientEnvironment.client_id == client_id