Skip to content

Commit f95d859

Browse files
Thread Tracing
1 parent 3177017 commit f95d859

File tree

2 files changed

+68
-11
lines changed

2 files changed

+68
-11
lines changed

deepgram/clients/live/v1/async_client.py

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import json
66
import logging
77
from typing import Dict, Union, Optional, cast, Any
8+
import threading
89

910
import websockets
1011
from websockets.client import WebSocketClientProtocol
@@ -49,8 +50,8 @@ class AsyncLiveClient: # pylint: disable=too-many-instance-attributes
4950

5051
_socket: WebSocketClientProtocol
5152
_event_handlers: Dict[LiveTranscriptionEvents, list]
52-
_listen_thread: asyncio.Task
53-
_keep_alive_thread: asyncio.Task
53+
_listen_thread: Union[asyncio.Task, None]
54+
_keep_alive_thread: Union[asyncio.Task, None]
5455

5556
_kwargs: Optional[Dict] = None
5657
_addons: Optional[Dict] = None
@@ -142,6 +143,11 @@ async def start(
142143
)
143144
self._exit_event.clear()
144145

146+
# debug the threads
147+
for thread in threading.enumerate():
148+
self._logger.debug("after running thread: %s", thread.name)
149+
self._logger.debug("number of active threads: %s", threading.active_count())
150+
145151
# listen thread
146152
self._listen_thread = asyncio.create_task(self._listening())
147153

@@ -152,6 +158,11 @@ async def start(
152158
else:
153159
self._logger.notice("keepalive is disabled")
154160

161+
# debug the threads
162+
for thread in threading.enumerate():
163+
self._logger.debug("after running thread: %s", thread.name)
164+
self._logger.debug("number of active threads: %s", threading.active_count())
165+
155166
# push open event
156167
await self._emit(
157168
LiveTranscriptionEvents(LiveTranscriptionEvents.Open),
@@ -195,11 +206,28 @@ async def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None:
195206
"""
196207
Emits events to the registered event handlers.
197208
"""
209+
210+
self._logger.debug("AsyncLiveClient._emit ENTER")
211+
212+
# debug the threads
213+
for thread in threading.enumerate():
214+
self._logger.debug("after running thread: %s", thread.name)
215+
self._logger.debug("number of active threads: %s", threading.active_count())
216+
217+
tasks = []
198218
for handler in self._event_handlers[event]:
199-
if asyncio.iscoroutinefunction(handler):
200-
await handler(self, *args, **kwargs)
201-
else:
202-
asyncio.create_task(handler(self, *args, **kwargs))
219+
tasks.append(asyncio.create_task(handler(self, *args, **kwargs)))
220+
221+
if len(tasks) > 0:
222+
self._logger.debug("waiting for tasks to finish...")
223+
await asyncio.gather(*filter(None, tasks), return_exceptions=True)
224+
225+
# debug the threads
226+
for thread in threading.enumerate():
227+
self._logger.debug("after running thread: %s", thread.name)
228+
self._logger.debug("number of active threads: %s", threading.active_count())
229+
230+
self._logger.debug("AsyncLiveClient._emit LEAVE")
203231

204232
# pylint: disable=too-many-return-statements,too-many-statements,too-many-locals
205233
async def _listening(self) -> None:
@@ -608,11 +636,15 @@ async def finish(self) -> bool:
608636
self._logger.verbose("cancelling tasks...")
609637
try:
610638
# Before cancelling, check if the tasks were created
639+
# debug the threads
640+
for thread in threading.enumerate():
641+
self._logger.debug("before running thread: %s", thread.name)
642+
self._logger.debug("number of active threads: %s", threading.active_count())
643+
611644
tasks = []
612-
if self._config.options.get("keepalive") == "true":
613-
if self._keep_alive_thread is not None:
614-
self._keep_alive_thread.cancel()
615-
tasks.append(self._keep_alive_thread)
645+
if self._keep_alive_thread is not None:
646+
self._keep_alive_thread.cancel()
647+
tasks.append(self._keep_alive_thread)
616648
if self._listen_thread is not None:
617649
self._listen_thread.cancel()
618650
tasks.append(self._listen_thread)
@@ -621,6 +653,11 @@ async def finish(self) -> bool:
621653
await asyncio.gather(*filter(None, tasks), return_exceptions=True)
622654
self._logger.notice("threads joined")
623655

656+
# debug the threads
657+
for thread in threading.enumerate():
658+
self._logger.debug("after running thread: %s", thread.name)
659+
self._logger.debug("number of active threads: %s", threading.active_count())
660+
624661
self._logger.notice("finish succeeded")
625662
self._logger.spam("AsyncLiveClient.finish LEAVE")
626663
return True

deepgram/clients/live/v1/client.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
# Use of this source code is governed by a MIT license that can be found in the LICENSE file.
33
# SPDX-License-Identifier: MIT
44
import json
5-
import threading
65
import time
76
import logging
87
from typing import Dict, Union, Optional, cast, Any
8+
import threading
99

1010
from websockets.sync.client import connect, ClientConnection
1111
import websockets
@@ -141,6 +141,11 @@ def start(
141141
self._socket = connect(url_with_params, additional_headers=combined_headers)
142142
self._exit_event.clear()
143143

144+
# debug the threads
145+
for thread in threading.enumerate():
146+
self._logger.debug("after running thread: %s", thread.name)
147+
self._logger.debug("number of active threads: %s", threading.active_count())
148+
144149
# listening thread
145150
self._listen_thread = threading.Thread(target=self._listening)
146151
self._listen_thread.start()
@@ -153,6 +158,11 @@ def start(
153158
else:
154159
self._logger.notice("keepalive is disabled")
155160

161+
# debug the threads
162+
for thread in threading.enumerate():
163+
self._logger.debug("after running thread: %s", thread.name)
164+
self._logger.debug("number of active threads: %s", threading.active_count())
165+
156166
# push open event
157167
self._emit(
158168
LiveTranscriptionEvents(LiveTranscriptionEvents.Open),
@@ -593,6 +603,11 @@ def finish(self) -> bool:
593603
"""
594604
self._logger.spam("LiveClient.finish ENTER")
595605

606+
# debug the threads
607+
for thread in threading.enumerate():
608+
self._logger.debug("before running thread: %s", thread.name)
609+
self._logger.debug("number of active threads: %s", threading.active_count())
610+
596611
# signal exit
597612
self._signal_exit()
598613

@@ -609,6 +624,11 @@ def finish(self) -> bool:
609624
self._listen_thread = None # type: ignore
610625
self._logger.notice("listening thread joined")
611626

627+
# debug the threads
628+
for thread in threading.enumerate():
629+
self._logger.debug("before running thread: %s", thread.name)
630+
self._logger.debug("number of active threads: %s", threading.active_count())
631+
612632
self._logger.notice("finish succeeded")
613633
self._logger.spam("LiveClient.finish LEAVE")
614634
return True

0 commit comments

Comments
 (0)