diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 48eaf939..0d3a77ff 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -2,9 +2,9 @@ name: Docs on: push: - branches: [ master, 'dev/*' ] + branches: [ master, develop ] pull_request: - branches: [ master ] + branches: [ master, develop ] jobs: build: @@ -29,8 +29,8 @@ jobs: cd doc && make html - name: Deploy Docs uses: peaceiris/actions-gh-pages@v3 - if: github.ref == 'refs/heads/master' + if: github.ref == 'refs/heads/master' # TODO: Deploy seperate develop-version of docs? with: github_token: ${{ secrets.GITHUB_TOKEN }} publish_dir: doc/_build/html - + diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 185caf2d..10396d7e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,9 +2,9 @@ name: Test on: push: - branches: [ master ] + branches: [ master, develop ] pull_request: - branches: [ master ] + branches: [ master, develop ] jobs: test: @@ -60,8 +60,8 @@ jobs: run: | python -m pip install --upgrade pip wheel pip install . - - name: Run eegnb install test - shell: bash + - name: Run eegnb cli test + shell: bash pip install -U psychtoolbox # JG_ADD run: | if [ "$RUNNER_OS" == "Linux" ]; then @@ -78,6 +78,11 @@ jobs: Xvfb :0 -screen 0 1024x768x24 -ac +extension GLX +render -noreset &> xvfb.log & export DISPLAY=:0 fi + eegnb --help + eegnb runexp --help + - name: Run tests + shell: bash + run: | pytest typecheck: diff --git a/.gitignore b/.gitignore index 7d84d133..73508d17 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,12 @@ __pycache__ -.venv - *.egg-info/ +# Created by tests +.coverage +coverage.xml +htmlcov +prof + # Built as part of docs doc/auto_examples doc/_build @@ -10,6 +14,9 @@ doc/_build # Built by auto_examples examples/visual_cueing/*.csv +# present for people who use pyenv +.venv + # tests/coverage artifacts .coverage coverage.xml diff --git a/doc/getting_started/installation.rst b/doc/getting_started/installation.rst index 6e971681..cbc603c2 100644 --- a/doc/getting_started/installation.rst +++ b/doc/getting_started/installation.rst @@ -51,7 +51,7 @@ Use the following commands to download the repo, create and activate a conda or git clone https://github.com/NeuroTechX/eeg-notebooks cd eeg-notebooks - + pip install -e . @@ -125,6 +125,9 @@ Start a jupyter notebooks session and you will be presented with the eeg-noteboo subject = 999 # a 'very British number' record_duration=120 + # Initiate EEG device + eeg_device = EEGDevice.create(device=board_name) + # Initiate EEG device eeg_device = EEG(device=board_name) diff --git a/doc/getting_started/running_experiments.md b/doc/getting_started/running_experiments.md index 810f4127..c0b5bac0 100644 --- a/doc/getting_started/running_experiments.md +++ b/doc/getting_started/running_experiments.md @@ -87,7 +87,7 @@ The first step is to import all of the necessary library dependencies. These are ```python from eegnb import generate_save_fn -from eegnb.devices.eeg import EEG +from eegnb.devices import EEGDevice from eegnb.experiments.visual_n170 import n170 ``` @@ -104,10 +104,10 @@ record_duration = 120 save_fn = generate_save_fn(board_name, experiment, subject, session) ``` -Next it is necessary to call the `eegnb.devices.eeg.EEG` class which handles all of the backend processes related to each device. +Next it is necessary to create an instance of the `eegnb.devices.EEGDevice` class which handles all of the backend processes related to each device. ```python -eeg_device = EEG(device=board_name) +eeg_device = EEGDevice.create(device_name=board_name) ``` Finally, we call the `present` method of the class corresponding to our desired experiment, in this case the visual N170. We pass both the EEG device and generated save file name in order to collect and save data. The presentation can also be run without an EEG device/save file for testing and debugging. @@ -120,7 +120,7 @@ All together the example script looks like ```python # Imports from eegnb import generate_save_fn -from eegnb.devices.eeg import EEG +from eegnb.devices import EEGDevice from eegnb.experiments.visual_n170 import n170 # Define some variables @@ -134,7 +134,7 @@ record_duration = 120 save_fn = generate_save_fn(board_name, experiment, subject, session) # Setup EEG device -eeg_device = EEG(device=board_name) +eeg_device = EEGDevice.create(device_name=board_name) # Run stimulus presentation n170.present(duration=record_duration, eeg=eeg_device, save_fn=save_fn) diff --git a/doc/getting_started/streaming.md b/doc/getting_started/streaming.md index d9f956b2..29f32510 100644 --- a/doc/getting_started/streaming.md +++ b/doc/getting_started/streaming.md @@ -4,14 +4,13 @@ Before getting going with running an experiment, it is important to first verify The exact steps for this varies with the device (MUSE, OpenBCI, others) and operating system (Windows, Mac, Linux) used. When using these instructions, you should make sure you are consulting the section appropriate for your combination of device and OS. -Initiating an EEG stream is a relatively easy process using the `eegnb.devices.eeg.EEG` class which abstracts the -the various devices and backends behind one easy call. +Initiating an EEG stream is a relatively easy process using the `eegnb.devices.EEGDevice` class which abstracts the various devices and backends behind one easy call. ```python -from eegnb.devices.eeg import EEG +from eegnb.devices import EEGDevice # define the name for the board you are using and call the EEG object -eeg = EEG(device='cyton') +eeg = EEGDevice.create(device='cyton') # start the stream eeg.start() @@ -111,10 +110,10 @@ menu pictures below. Now that we have the COM port, we can initiate the stream by passing it to the EEG device in the object call. ```python -from eegnb.devices.eeg import EEG +from eegnb.devices import EEGDevice # define the name for the board you are using and call the EEG object -eeg = EEG( +eeg = EEGDevice.create( device='cyton', serial_port='COM7' ) diff --git a/eegnb/cli/__main__.py b/eegnb/cli/__main__.py index 6e57fbce..94d6d961 100644 --- a/eegnb/cli/__main__.py +++ b/eegnb/cli/__main__.py @@ -1,9 +1,11 @@ -from eegnb import DATA_DIR import click -from time import sleep -from os import path +import logging import os import shutil +from os import path +from time import sleep + +from eegnb import DATA_DIR from eegnb.datasets.datasets import zip_data_folders from .introprompt import intro_prompt @@ -11,11 +13,13 @@ from eegnb.devices.eeg import EEG from eegnb.analysis.utils import check_report +logger = logging.getLogger(__name__) + @click.group(name="eegnb") def main(): """eeg-notebooks command line interface""" - pass + logging.basicConfig(level=logging.INFO) @main.command() @@ -34,7 +38,7 @@ def runexp( recdur: float = None, outfname: str = None, prompt: bool = False, - dosigqualcheck = True, + dosigqualcheck=True, ): """ Run experiment. @@ -59,31 +63,34 @@ def runexp( if prompt: eeg, experiment, recdur, outfname = intro_prompt() else: + from .utils import run_experiment + from eegnb.devices import EEGDevice + if eegdevice == "ganglion": - # if the ganglion is chosen a MAC address should also be provided - eeg = EEG(device=eegdevice, mac_addr=macaddr) + # if the ganglion is chosen a MAC address should also be proviced + eeg = EEGDevice.create(device_name=eegdevice, mac_addr=macaddr) else: - eeg = EEG(device=eegdevice) + if eegdevice: + eeg = EEGDevice.create(device_name=eegdevice) + else: + print("No EEG device provided, using a synthetic one.") def askforsigqualcheck(): do_sigqual = input("\n\nRun signal quality check? (y/n). Recommend y \n") - if do_sigqual == 'y': + if do_sigqual == "y": check_report(eeg) - elif do_sigqual != 'n': + elif do_sigqual != "n": "Sorry, didn't recognize answer. " askforsigqualcheck() - + if dosigqualcheck: askforsigqualcheck() - run_experiment(experiment, eeg, recdur, outfname) print(f"\n\n\nExperiment complete! Recorded data is saved @ {outfname}") - - @main.command() @click.option("-ed", "--eegdevice", help="EEG device to use", required=True) def checksigqual(eegdevice: str): @@ -124,7 +131,7 @@ def runzip(experiment: str, site: str, prompt: bool = False): $ eegnb runzip -ex visual-N170 $ eegnb runzip -ex visual-N170 -s local-ntcs-2 - + Launch the interactive command line to select experiment $ eegnb runzip -ip @@ -191,5 +198,17 @@ def localdata_report(): print(" {}".format(items)) +def test_cli(): + from click.testing import CliRunner + + runner = CliRunner() + result = runner.invoke(main, ["--help"]) + assert result.exit_code == 0 + + runner = CliRunner() + result = runner.invoke(runexp, ["--help"]) + assert result.exit_code == 0 + + if __name__ == "__main__": main() diff --git a/eegnb/cli/introprompt.py b/eegnb/cli/introprompt.py index 6eaf90c9..c2fda96f 100644 --- a/eegnb/cli/introprompt.py +++ b/eegnb/cli/introprompt.py @@ -1,14 +1,15 @@ -import os +import os from typing import Tuple from pathlib import Path from eegnb import generate_save_fn, DATA_DIR -from eegnb.devices.eeg import EEG +from eegnb.devices import EEGDevice from .utils import run_experiment, get_exp_desc, experiments -eegnb_sites = ['eegnb_examples', 'grifflab_dev', 'jadinlab_home'] +eegnb_sites = ["eegnb_examples", "grifflab_dev", "jadinlab_home"] -def device_prompt() -> EEG: + +def device_prompt() -> EEGDevice: # define the names of the available boards # boards is a mapping from board code to board description boards = { @@ -17,7 +18,7 @@ def device_prompt() -> EEG: "muse2": "Muse 2", "museS": "Muse S", "muse2016_bfn": "Muse 2016 - brainflow, native bluetooth", - "muse2016_bfb": "Muse 2016 - brainflow, BLED bluetooth dongle", + "muse2016_bfb": "Muse 2016 - brainflow, BLED bluetooth dongle", "muse2_bfn": "Muse 2 - brainflow, native bluetooth", "muse2_bfb": "Muse 2 - brainflow, BLED bluetooth dongle", "museS_bfn": "Muse S - brainflow, native bluetooth", @@ -61,9 +62,7 @@ def device_prompt() -> EEG: board_code = board_code + "_wifi" if board_code == "ganglion": # If the Ganglion is being used, you can enter optional Ganglion mac address - ganglion_mac_address = input( - "\nGanglion MAC Address (Press Enter to Autoscan): " - ) + mac_address = input("\nGanglion MAC Address (Press Enter to Autoscan): ") elif board_code == "ganglion_wifi": # IP address is required for this board configuration ip_address = input("\nEnter Ganglion+WiFi IP Address: ") @@ -72,21 +71,18 @@ def device_prompt() -> EEG: f"\n{board_desc} + WiFi is not supported. Please use the dongle that was shipped with the device.\n" ) exit() - + if board_code.startswith("ganglion"): if board_code == "ganglion_wifi": - eeg_device = EEG(device=board_code, ip_addr=ip_address) + return EEGDevice.create(device_name=board_code, ip_addr=ip_address) else: - eeg_device = EEG(device=board_code, mac_addr=ganglion_mac_address) + return EEGDevice.create(device_name=board_code, mac_addr=mac_address) else: - eeg_device = EEG(device=board_code) + return EEGDevice.create(device_name=board_code) - return eeg_device - - -def exp_prompt(runorzip:str='run') -> str: - print("\nPlease select which experiment you would like to %s: \n" %runorzip) +def exp_prompt(runorzip: str = "run") -> str: + print("\nPlease select which experiment you would like to %s: \n" % runorzip) print( "\n".join( [ @@ -102,28 +98,33 @@ def exp_prompt(runorzip:str='run') -> str: return exp_selection -def site_prompt(experiment:str) -> str: - experiment_dir=os.path.join(DATA_DIR,experiment) - + +def site_prompt(experiment: str) -> str: + experiment_dir = os.path.join(DATA_DIR, experiment) + if not (os.path.isdir(experiment_dir)): - print('Folder {} does not exist in {}\n'.format(experiment,DATA_DIR)) - raise ValueError ('Directory does not exist') + print("Folder {} does not exist in {}\n".format(experiment, DATA_DIR)) + raise ValueError("Directory does not exist") - if len(os.listdir(experiment_dir) ) == 0: - print('No subfolders exist in {}' .format(experiment_dir)) - raise ValueError ('Directory is empty') + if len(os.listdir(experiment_dir)) == 0: + print("No subfolders exist in {}".format(experiment_dir)) + raise ValueError("Directory is empty") - print("\nPlease select which experiment subfolder you would like to zip. Default 'local_ntcs'") + print( + "\nPlease select which experiment subfolder you would like to zip. Default 'local_ntcs'" + ) print("\nCurrent subfolders for experiment {}:\n".format(experiment)) - dirslist = [d for d in os.listdir(experiment_dir) if d not in eegnb_sites ] - for d in dirslist: print(d + '\n') - site=str(input('\nType folder name: ')) - if site=="": - site="local" + dirslist = [d for d in os.listdir(experiment_dir) if d not in eegnb_sites] + for d in dirslist: + print(d + "\n") + site = str(input("\nType folder name: ")) + if site == "": + site = "local" print("Selected Folder : {} \n".format(site)) return site + def intro_prompt() -> Tuple[EEG, str, int, str]: """This function handles the user prompts for inputting information about the session they wish to record.""" print("Welcome to NeurotechX EEG Notebooks\n") @@ -147,25 +148,25 @@ def intro_prompt() -> Tuple[EEG, str, int, str]: session_nb = int(input("Enter session #: ")) # ask if they are ready to begin - #print("\nEEG device successfully connected!") - #input("Press [ENTER] when ready to begin...") + # print("\nEEG device successfully connected!") + # input("Press [ENTER] when ready to begin...") # generate the save file name save_fn = generate_save_fn( - eeg_device.device_name, exp_selection, subj_id, session_nb + eeg_device.device_name, exp_selection, subj_id, session_nb ) return eeg_device, exp_selection, duration, str(save_fn) -def intro_prompt_zip() -> Tuple[str,str]: +def intro_prompt_zip() -> Tuple[str, str]: """This function handles the user prompts for inputting information for zipping their function.""" # ask the user which experiment to zip - exp_selection = exp_prompt(runorzip='zip') - site= site_prompt(exp_selection) - - return exp_selection,site + exp_selection = exp_prompt(runorzip="zip") + site = site_prompt(exp_selection) + + return exp_selection, site def main() -> None: diff --git a/eegnb/cli/utils.py b/eegnb/cli/utils.py index 00a769c5..184c3919 100644 --- a/eegnb/cli/utils.py +++ b/eegnb/cli/utils.py @@ -1,11 +1,10 @@ +from eegnb.devices import EEGDevice -#change the pref libraty to PTB and set the latency mode to high precision +# change the pref libraty to PTB and set the latency mode to high precision from psychopy import prefs -prefs.hardware['audioLib'] = 'PTB' -prefs.hardware['audioLatencyMode'] = 3 - -from eegnb.devices.eeg import EEG +prefs.hardware["audioLib"] = "PTB" +prefs.hardware["audioLatencyMode"] = 3 from eegnb.experiments.visual_n170 import n170 from eegnb.experiments.visual_p300 import p300 @@ -38,7 +37,7 @@ def get_exp_desc(exp: str): def run_experiment( - experiment: str, eeg_device: EEG, record_duration: float = None, save_fn=None + experiment: str, eeg_device: EEGDevice, record_duration: float = None, save_fn=None ): if experiment in experiments: module = experiments[experiment] diff --git a/eegnb/devices/__init__.py b/eegnb/devices/__init__.py index e69de29b..95552237 100644 --- a/eegnb/devices/__init__.py +++ b/eegnb/devices/__init__.py @@ -0,0 +1,5 @@ +from .base import EEGDevice +from .muse import MuseDevice +from ._brainflow import BrainflowDevice + +all_devices = MuseDevice.devices + BrainflowDevice.devices diff --git a/eegnb/devices/_brainflow.py b/eegnb/devices/_brainflow.py new file mode 100644 index 00000000..cc3acfde --- /dev/null +++ b/eegnb/devices/_brainflow.py @@ -0,0 +1,235 @@ +import logging +from time import sleep +from multiprocessing import Process +from typing import List, Tuple + +import numpy as np +import pandas as pd + +from brainflow import BoardShim, BoardIds, BrainFlowInputParams +from .base import EEGDevice, _check_samples + + +logger = logging.getLogger(__name__) + + +class BrainflowDevice(EEGDevice): + # list of brainflow devices + devices: List[str] = [ + "ganglion", + "ganglion_wifi", + "cyton", + "cyton_wifi", + "cyton_daisy", + "cyton_daisy_wifi", + "brainbit", + "unicorn", + "synthetic", + "brainbit", + "notion1", + "notion2", + "crown", + "museS_bfn", # bfn = brainflow with native bluetooth + "museS_bfb", # bfb = brainflow with BLED dongle bluetooth + "muse2_bfn", + "muse2_bfb", + "muse2016_bfn", + "muse2016_bfb", + ] + + def __init__( + self, + device_name: str, + serial_num=None, + serial_port=None, + mac_addr=None, + other=None, + ip_addr=None, + ): + EEGDevice.__init__(self, device_name) + self.serial_num = serial_num + self.serial_port = serial_port + self.mac_address = mac_addr + self.other = other + self.ip_addr = ip_addr + self.markers: List[Tuple[List[int], float]] = [] + self._init_brainflow() + + def start(self, filename: str = None, duration=None) -> None: + self.save_fn = filename + + def record(): + sleep(duration) + self._stop_brainflow() + + self.board.start_stream() + if duration: + logger.info( + "Starting background recording process, will save to file: %s" + % self.save_fn + ) + self.recording = Process(target=lambda: record()) + self.recording.start() + + def stop(self) -> None: + self._stop_brainflow() + + def push_sample(self, marker: List[int], timestamp: float): + last_timestamp = self.board.get_current_board_data(1)[-1][0] + self.markers.append((marker, last_timestamp)) + + def check(self, max_uv_abs=200) -> List[str]: + data = self.board.get_board_data() # will clear board buffer + # print(data) + channel_names = BoardShim.get_eeg_names(self.brainflow_id) + # FIXME: _check_samples expects different (Muse) inputs + checked = _check_samples(data.T, channel_names, max_uv_abs=max_uv_abs) # type: ignore + bads = [ch for ch, ok in checked.items() if not ok] + return bads + + def _init_brainflow(self) -> None: + """ + This function initializes the brainflow backend based on the input device name. It calls + a utility function to determine the appropriate USB port to use based on the current operating system. + Additionally, the system allows for passing a serial number in the case that they want to use either + the BrainBit or the Unicorn EEG devices from the brainflow family. + + Parameters: + serial_num (str or int): serial number for either the BrainBit or Unicorn devices. + """ + from eegnb.devices.utils import get_openbci_usb + + # Initialize brainflow parameters + self.brainflow_params = BrainFlowInputParams() + + device_name_to_id = { + "ganglion": BoardIds.GANGLION_BOARD.value, + "ganglion_wifi": BoardIds.GANGLION_WIFI_BOARD.value, + "cyton": BoardIds.CYTON_BOARD.value, + "cyton_wifi": BoardIds.CYTON_WIFI_BOARD.value, + "cyton_daisy": BoardIds.CYTON_DAISY_BOARD.value, + "cyton_daisy_wifi": BoardIds.CYTON_DAISY_WIFI_BOARD.value, + "brainbit": BoardIds.BRAINBIT_BOARD.value, + "unicorn": BoardIds.UNICORN_BOARD.value, + "callibri_eeg": BoardIds.CALLIBRI_EEG_BOARD.value, + "notion1": BoardIds.NOTION_1_BOARD.value, + "notion2": BoardIds.NOTION_2_BOARD.value, + "crown": BoardIds.CROWN_BOARD.value, + "museS_bfn": BoardIds.MUSE_S_BOARD.value, + "museS_bfb": BoardIds.MUSE_S_BLED_BOARD.value, + "muse2_bfn": BoardIds.MUSE_2_BOARD.value, + "muse2_bfb": BoardIds.MUSE_2_BLED_BOARD.value, + "muse2016_bfn": BoardIds.MUSE_2016_BOARD.value, + "muse2016_bfb": BoardIds.MUSE_2016_BLED_BOARD.value, + "synthetic": BoardIds.SYNTHETIC_BOARD.value, + } + + # validate mapping + assert all(name in device_name_to_id for name in self.devices) + + self.brainflow_id = device_name_to_id[self.device_name] + + if self.device_name == "ganglion": + if self.serial_port is None: + self.brainflow_params.serial_port = get_openbci_usb() + # set mac address parameter in case + if self.mac_address is None: + logger.info( + "No MAC address provided, attempting to connect without one" + ) + else: + self.brainflow_params.mac_address = self.mac_address + + elif self.device_name in ["ganglion_wifi", "cyton_wifi", "cyton_daisy_wifi"]: + if self.ip_addr is not None: + self.brainflow_params.ip_address = self.ip_addr + + elif self.device_name in ["cyton", "cyton_daisy"]: + if self.serial_port is None: + self.brainflow_params.serial_port = get_openbci_usb() + + elif self.device_name == "callibri_eeg": + if self.other: + self.brainflow_params.other_info = str(self.other) + + # some devices allow for an optional serial number parameter for better connection + if self.serial_num: + self.brainflow_params.serial_number = str(self.serial_num) + + if self.serial_port: + self.brainflow_params.serial_port = str(self.serial_port) + + # Initialize board_shim + self.sfreq = BoardShim.get_sampling_rate(self.brainflow_id) + self.board = BoardShim(self.brainflow_id, self.brainflow_params) + self.board.prepare_session() + + def get_data(self) -> pd.DataFrame: + from eegnb.devices.utils import create_stim_array + + data = self.board.get_board_data() # will clear board buffer + + # transform data for saving + data = data.T # transpose data + print(data) + + # get the channel names for EEG data + if self.brainflow_id == BoardIds.GANGLION_BOARD.value: + # if a ganglion is used, use recommended default EEG channel names + ch_names = ["fp1", "fp2", "tp7", "tp8"] + else: + # otherwise select eeg channel names via brainflow API + ch_names = BoardShim.get_eeg_names(self.brainflow_id) + + # pull EEG channel data via brainflow API + eeg_data = data[:, BoardShim.get_eeg_channels(self.brainflow_id)] + timestamps = data[:, BoardShim.get_timestamp_channel(self.brainflow_id)] + + # Create a column for the stimuli to append to the EEG data + stim_array = create_stim_array(timestamps, self.markers) + timestamps = timestamps[ + ..., None + ] # Add an additional dimension so that shapes match + total_data = np.append(timestamps, eeg_data, 1) + total_data = np.append( + total_data, stim_array, 1 + ) # Append the stim array to data. + + # Subtract five seconds of settling time from beginning + # total_data = total_data[5 * self.sfreq :] + df = pd.DataFrame(total_data, columns=["timestamps"] + ch_names + ["stim"]) + return df + + def _save(self) -> None: + """Saves the data to a CSV file.""" + assert self.save_fn + df = self.get_data() + df.to_csv(self.save_fn, index=False) + + def _stop_brainflow(self) -> None: + """This functions kills the brainflow backend and saves the data to a CSV file.""" + # Collect session data and kill session + if self.save_fn: + self._save() + self.board.stop_stream() + self.board.release_session() + + +def test_check(): + device = BrainflowDevice(device_name="synthetic") + with device: + sleep(2) # is 2s really needed? + bads = device.check(max_uv_abs=300) + # Seems to blink between the two... + assert bads == ["F6", "F8"] or bads == ["F4", "F6", "F8"] + # print(bads) + # assert not bads + + +def test_get_data(): + device = BrainflowDevice(device_name="synthetic") + with device: + sleep(2) + df = device.get_data() + print(df) + assert not df.empty diff --git a/eegnb/devices/base.py b/eegnb/devices/base.py new file mode 100644 index 00000000..175efc6d --- /dev/null +++ b/eegnb/devices/base.py @@ -0,0 +1,114 @@ +""" +Abstraction for the various supported EEG devices. +""" + +import logging +from typing import List, Dict +from abc import ABCMeta, abstractmethod + +import numpy as np +from .utils import EEG_INDICES, EEG_CHANNELS, SAMPLE_FREQS + + +logger = logging.getLogger(__name__) + + +def _check_samples( + buffer: np.ndarray, channels: List[str], max_uv_abs=200 +) -> Dict[str, bool]: + # TODO: Better signal quality check + chmax = dict(zip(channels, np.max(np.abs(buffer), axis=0))) + return {ch: maxval < max_uv_abs for ch, maxval in chmax.items()} + + +def test_check_samples(): + buffer = np.array([[9.0, 11.0, -5, -13]]) + assert {"TP9": True, "AF7": False, "AF8": True, "TP10": False} == _check_samples( + buffer, channels=["TP9", "AF7", "AF8", "TP10"], max_uv_abs=10 + ) + + +class EEGDevice(metaclass=ABCMeta): + def __init__(self, device: str) -> None: + """ + The initialization function takes the name of the EEG device and initializes the appropriate backend. + + Parameters: + device (str): name of eeg device used for reading data. + """ + self.device_name = device + + @classmethod + def create(cls, device_name: str, *args, **kwargs) -> "EEGDevice": + from .muse import MuseDevice + from ._brainflow import BrainflowDevice + + if device_name in BrainflowDevice.devices: + return BrainflowDevice(device_name) + elif device_name in MuseDevice.devices: + return MuseDevice(device_name) + else: + raise ValueError(f"Invalid device name: {device_name}") + + def __enter__(self): + self.start() + + def __exit__(self, *args): + self.stop() + + @abstractmethod + def start(self, filename: str = None, duration=None): + """ + Starts the EEG device based on the defined backend. + + Parameters: + filename (str): name of the file to save the sessions data to. + """ + raise NotImplementedError + + @abstractmethod + def stop(self): + raise NotImplementedError + + @abstractmethod + def push_sample(self, marker: List[int], timestamp: float): + """ + Push a marker and its timestamp to store alongside the EEG data. + + Parameters: + marker (int): marker number for the stimuli being presented. + timestamp (float): timestamp of stimulus onset from time.time() function. + """ + raise NotImplementedError + + def get_samples(self): + raise NotImplementedError + + @abstractmethod + def check(self): + raise NotImplementedError + + @property + def n_channels(self) -> int: + return len(EEG_INDICES[self.device_name]) + + @property + def sfreq(self) -> int: + return SAMPLE_FREQS[self.device_name] + + @property + def channels(self) -> List[str]: + return EEG_CHANNELS[self.device_name] + + +def test_create(): + device = EEGDevice.create("synthetic") + assert device + + +def test_instantiate_should_fail(): + # abstract base class should not be instantiated on its own + import pytest + + with pytest.raises(TypeError): + EEGDevice("test") # type: ignore diff --git a/eegnb/devices/eeg.py b/eegnb/devices/eeg.py deleted file mode 100644 index 6b22a377..00000000 --- a/eegnb/devices/eeg.py +++ /dev/null @@ -1,450 +0,0 @@ -""" Abstraction for the various supported EEG devices. - - 1. Determine which backend to use for the board. - 2. - -""" - -import sys -import time -import logging -from time import sleep -from multiprocessing import Process - -import numpy as np -import pandas as pd - -from brainflow import BoardShim, BoardIds, BrainFlowInputParams -from muselsl import stream, list_muses, record, constants as mlsl_cnsts -from pylsl import StreamInfo, StreamOutlet, StreamInlet, resolve_byprop - -from eegnb.devices.utils import get_openbci_usb, create_stim_array,SAMPLE_FREQS,EEG_INDICES,EEG_CHANNELS - - -logger = logging.getLogger(__name__) - -# list of brainflow devices -brainflow_devices = [ - "ganglion", - "ganglion_wifi", - "cyton", - "cyton_wifi", - "cyton_daisy", - "cyton_daisy_wifi", - "brainbit", - "unicorn", - "synthetic", - "brainbit", - "notion1", - "notion2", - "freeeeg32", - "crown", - "museS_bfn", # bfn = brainflow with native bluetooth; - "museS_bfb", # bfb = brainflow with BLED dongle bluetooth - "muse2_bfn", - "muse2_bfb", - "muse2016_bfn", - "muse2016_bfb" -] - - -class EEG: - device_name: str - stream_started: bool = False - def __init__( - self, - device=None, - serial_port=None, - serial_num=None, - mac_addr=None, - other=None, - ip_addr=None, - ): - """The initialization function takes the name of the EEG device and determines whether or not - the device belongs to the Muse or Brainflow families and initializes the appropriate backend. - - Parameters: - device (str): name of eeg device used for reading data. - """ - # determine if board uses brainflow or muselsl backend - self.device_name = device - self.serial_num = serial_num - self.serial_port = serial_port - self.mac_address = mac_addr - self.ip_addr = ip_addr - self.other = other - self.backend = self._get_backend(self.device_name) - self.initialize_backend() - self.n_channels = len(EEG_INDICES[self.device_name]) - self.sfreq = SAMPLE_FREQS[self.device_name] - self.channels = EEG_CHANNELS[self.device_name] - - def initialize_backend(self): - if self.backend == "brainflow": - self._init_brainflow() - self.timestamp_channel = BoardShim.get_timestamp_channel(self.brainflow_id) - elif self.backend == "muselsl": - self._init_muselsl() - self._muse_get_recent() # run this at initialization to get some - # stream metadata into the eeg class - - def _get_backend(self, device_name): - if device_name in brainflow_devices: - return "brainflow" - elif device_name in ["muse2016", "muse2", "museS"]: - return "muselsl" - - ##################### - # MUSE functions # - ##################### - def _init_muselsl(self): - # Currently there's nothing we need to do here. However keeping the - # option open to add things with this init method. - self._muse_recent_inlet = None - - def _start_muse(self, duration): - if sys.platform in ["linux", "linux2", "darwin"]: - # Look for muses - self.muses = list_muses() - # self.muse = muses[0] - - # Start streaming process - self.stream_process = Process( - target=stream, args=(self.muses[0]["address"],) - ) - self.stream_process.start() - - # Create markers stream outlet - self.muse_StreamInfo = StreamInfo( - "Markers", "Markers", 1, 0, "int32", "myuidw43536" - ) - self.muse_StreamOutlet = StreamOutlet(self.muse_StreamInfo) - - # Start a background process that will stream data from the first available Muse - print("starting background recording process") - if self.save_fn: - print("will save to file: %s" % self.save_fn) - self.recording = Process(target=record, args=(duration, self.save_fn)) - self.recording.start() - - time.sleep(5) - self.stream_started = True - self.push_sample([99], timestamp=time.time()) - - def _stop_muse(self): - pass - - def _muse_push_sample(self, marker, timestamp): - self.muse_StreamOutlet.push_sample(marker, timestamp) - - def _muse_get_recent(self, n_samples: int=256, restart_inlet: bool=False): - if self._muse_recent_inlet and not restart_inlet: - inlet = self._muse_recent_inlet - else: - # Initiate a new lsl stream - streams = resolve_byprop("type", "EEG", timeout=mlsl_cnsts.LSL_SCAN_TIMEOUT) - if not streams: - raise Exception("Couldn't find any stream, is your device connected?") - inlet = StreamInlet(streams[0], max_chunklen=mlsl_cnsts.LSL_EEG_CHUNK) - self._muse_recent_inlet = inlet - - info = inlet.info() - sfreq = info.nominal_srate() - description = info.desc() - n_chans = info.channel_count() - - self.sfreq = sfreq - self.info = info - self.n_chans = n_chans - - timeout = (n_samples/sfreq)+0.5 - samples, timestamps = inlet.pull_chunk(timeout=timeout, - max_samples=n_samples) - - samples = np.array(samples) - timestamps = np.array(timestamps) - - ch = description.child("channels").first_child() - ch_names = [ch.child_value("label")] - for i in range(n_chans): - ch = ch.next_sibling() - lab = ch.child_value("label") - if lab != "": - ch_names.append(lab) - - df = pd.DataFrame(samples, index=timestamps, columns=ch_names) - return df - - ########################## - # BrainFlow functions # - ########################## - def _init_brainflow(self): - """This function initializes the brainflow backend based on the input device name. It calls - a utility function to determine the appropriate USB port to use based on the current operating system. - Additionally, the system allows for passing a serial number in the case that they want to use either - the BraintBit or the Unicorn EEG devices from the brainflow family. - - Parameters: - serial_num (str or int): serial number for either the BrainBit or Unicorn devices. - """ - # Initialize brainflow parameters - self.brainflow_params = BrainFlowInputParams() - - if self.device_name == "ganglion": - self.brainflow_id = BoardIds.GANGLION_BOARD.value - if self.serial_port is None: - self.brainflow_params.serial_port = get_openbci_usb() - # set mac address parameter in case - if self.mac_address is None: - print("No MAC address provided, attempting to connect without one") - else: - self.brainflow_params.mac_address = self.mac_address - - elif self.device_name == "ganglion_wifi": - self.brainflow_id = BoardIds.GANGLION_WIFI_BOARD.value - if self.ip_addr is not None: - self.brainflow_params.ip_address = self.ip_addr - self.brainflow_params.ip_port = 6677 - - elif self.device_name == "cyton": - self.brainflow_id = BoardIds.CYTON_BOARD.value - if self.serial_port is None: - self.brainflow_params.serial_port = get_openbci_usb() - - elif self.device_name == "cyton_wifi": - self.brainflow_id = BoardIds.CYTON_WIFI_BOARD.value - if self.ip_addr is not None: - self.brainflow_params.ip_address = self.ip_addr - self.brainflow_params.ip_port = 6677 - - elif self.device_name == "cyton_daisy": - self.brainflow_id = BoardIds.CYTON_DAISY_BOARD.value - if self.serial_port is None: - self.brainflow_params.serial_port = get_openbci_usb() - - elif self.device_name == "cyton_daisy_wifi": - self.brainflow_id = BoardIds.CYTON_DAISY_WIFI_BOARD.value - if self.ip_addr is not None: - self.brainflow_params.ip_address = self.ip_addr - - elif self.device_name == "brainbit": - self.brainflow_id = BoardIds.BRAINBIT_BOARD.value - - elif self.device_name == "unicorn": - self.brainflow_id = BoardIds.UNICORN_BOARD.value - - elif self.device_name == "callibri_eeg": - self.brainflow_id = BoardIds.CALLIBRI_EEG_BOARD.value - if self.other: - self.brainflow_params.other_info = str(self.other) - - elif self.device_name == "notion1": - self.brainflow_id = BoardIds.NOTION_1_BOARD.value - - elif self.device_name == "notion2": - self.brainflow_id = BoardIds.NOTION_2_BOARD.value - - elif self.device_name == "crown": - self.brainflow_id = BoardIds.CROWN_BOARD.value - - elif self.device_name == "freeeeg32": - self.brainflow_id = BoardIds.FREEEEG32_BOARD.value - if self.serial_port is None: - self.brainflow_params.serial_port = get_openbci_usb() - - elif self.device_name == "museS_bfn": - self.brainflow_id = BoardIds.MUSE_S_BOARD.value - - elif self.device_name == "museS_bfb": - self.brainflow_id = BoardIds.MUSE_S_BLED_BOARD.value - - elif self.device_name == "muse2_bfn": - self.brainflow_id = BoardIds.MUSE_2_BOARD.value - - elif self.device_name == "muse2_bfb": - self.brainflow_id = BoardIds.MUSE_2_BLED_BOARD.value - - elif self.device_name == "muse2016_bfn": - self.brainflow_id = BoardIds.MUSE_2016_BOARD.value - - elif self.device_name == "muse2016_bfb": - self.brainflow_id = BoardIds.MUSE_2016_BLED_BOARD.value - - elif self.device_name == "synthetic": - self.brainflow_id = BoardIds.SYNTHETIC_BOARD.value - - # some devices allow for an optional serial number parameter for better connection - if self.serial_num: - serial_num = str(self.serial_num) - self.brainflow_params.serial_number = serial_num - - if self.serial_port: - serial_port = str(self.serial_port) - self.brainflow_params.serial_port = serial_port - - # Initialize board_shim - self.sfreq = BoardShim.get_sampling_rate(self.brainflow_id) - self.board = BoardShim(self.brainflow_id, self.brainflow_params) - self.board.prepare_session() - - def _start_brainflow(self): - # only start stream if non exists - if not self.stream_started: - self.board.start_stream() - - self.stream_started = True - - # wait for signal to settle - if (self.device_name.find("cyton") != -1) or (self.device_name.find("ganglion") != -1): - # wait longer for openbci cyton / ganglion - sleep(10) - else: - sleep(5) - - def _stop_brainflow(self): - """This functions kills the brainflow backend and saves the data to a CSV file.""" - - # Collect session data and kill session - data = self.board.get_board_data() # will clear board buffer - self.board.stop_stream() - self.board.release_session() - - # Extract relevant metadata from board - ch_names, eeg_data, timestamps = self._brainflow_extract(data) - - # Create a column for the stimuli to append to the EEG data - stim_array = create_stim_array(timestamps, self.markers) - timestamps = timestamps[ ..., None ] - - # Add an additional dimension so that shapes match - total_data = np.append(timestamps, eeg_data, 1) - - # Append the stim array to data. - total_data = np.append(total_data, stim_array, 1) - - # Subtract five seconds of settling time from beginning - total_data = total_data[5 * self.sfreq :] - data_df = pd.DataFrame(total_data, columns=["timestamps"] + ch_names + ["stim"]) - data_df.to_csv(self.save_fn, index=False) - - - - def _brainflow_extract(self, data): - """ - Formats the data returned from brainflow to get - ch_names; list of channel names - eeg_data: NDArray of eeg samples - timestamps: NDArray of timestamps - """ - - # transform data for saving - data = data.T # transpose data - - # get the channel names for EEG data - if ( - self.brainflow_id == BoardIds.GANGLION_BOARD.value - or self.brainflow_id == BoardIds.GANGLION_WIFI_BOARD.value - ): - # if a ganglion is used, use recommended default EEG channel names - ch_names = ["fp1", "fp2", "tp7", "tp8"] - elif self.brainflow_id == BoardIds.FREEEEG32_BOARD.value: - ch_names = [f"eeg_{i}" for i in range(0, 32)] - else: - # otherwise select eeg channel names via brainflow API - ch_names = BoardShim.get_eeg_names(self.brainflow_id) - - # pull EEG channel data via brainflow API - eeg_data = data[:, BoardShim.get_eeg_channels(self.brainflow_id)] - timestamps = data[:, BoardShim.get_timestamp_channel(self.brainflow_id)] - - return ch_names,eeg_data,timestamps - - - def _brainflow_push_sample(self, marker): - last_timestamp = self.board.get_current_board_data(1)[self.timestamp_channel][0] - self.markers.append([marker, last_timestamp]) - - - def _brainflow_get_recent(self, n_samples=256): - - # initialize brainflow if not set - if self.board == None: - self._init_brainflow() - - # start branflow stream - self._start_brainflow() - - # get the latest data - data = self.board.get_current_board_data(n_samples) - - ch_names, eeg_data, timestamps = self._brainflow_extract(data) - - eeg_data = np.array(eeg_data) - timestamps = np.array(timestamps) - - df = pd.DataFrame(eeg_data, index=timestamps, columns=ch_names) - # print (df) - return df - - ################################# - # Highlevel device functions # - ################################# - - def start(self, fn, duration=None): - """Starts the EEG device based on the defined backend. - - Parameters: - fn (str): name of the file to save the sessions data to. - """ - if fn: - self.save_fn = fn - - if self.backend == "brainflow": - self._start_brainflow() - self.markers = [] - elif self.backend == "muselsl": - self._start_muse(duration) - - - def push_sample(self, marker, timestamp): - """ - Universal method for pushing a marker and its timestamp to store alongside the EEG data. - - Parameters: - marker (int): marker number for the stimuli being presented. - timestamp (float): timestamp of stimulus onset from time.time() function. - """ - if self.backend == "brainflow": - self._brainflow_push_sample(marker=marker) - elif self.backend == "muselsl": - self._muse_push_sample(marker=marker, timestamp=timestamp) - - def stop(self): - if self.backend == "brainflow": - self._stop_brainflow() - elif self.backend == "muselsl": - pass - - def get_recent(self, n_samples: int = 256): - """ - Usage: - ------- - from eegnb.devices.eeg import EEG - this_eeg = EEG(device='museS') - df_rec = this_eeg.get_recent() - """ - - if self.backend == "brainflow": - df = self._brainflow_get_recent(n_samples) - elif self.backend == "muselsl": - df = self._muse_get_recent(n_samples) - else: - raise ValueError(f"Unknown backend {self.backend}") - - # Sort out the sensor coils - sorted_cols = sorted(df.columns) - df = df[sorted_cols] - - - return df - diff --git a/eegnb/devices/muse.py b/eegnb/devices/muse.py new file mode 100644 index 00000000..294e04ab --- /dev/null +++ b/eegnb/devices/muse.py @@ -0,0 +1,133 @@ +import sys +import logging +from time import time, sleep +from multiprocessing import Process +from typing import List, Optional + +import numpy as np +import muselsl +import pylsl + +from .base import EEGDevice, _check_samples + +logger = logging.getLogger(__name__) + +BACKEND = "bleak" +CHANNELS_MUSE = ["TP9", "AF7", "AF8", "TP10"] + + +def stream(address, sources): + muselsl.stream( + address, + backend=BACKEND, + ppg_enabled="PPG" in sources, + acc_enabled="ACC" in sources, + gyro_enabled="GYRO" in sources, + ) + + +def record(duration, filename, data_source="EEG"): + muselsl.record(duration=duration, filename=filename, data_source=data_source) + + +class MuseDevice(EEGDevice): + # list of muse devices + devices = [ + "muse2016", + "muse2", + "museS", + ] + + def __init__(self, device_name: str): + EEGDevice.__init__(self, device_name) + self.stream_process: Optional[Process] = None + + @property + def started(self) -> bool: + if self.stream_process: + return self.stream_process.exitcode is None + return False + + def start(self, filename: str = None, duration=None): + """ + Starts the EEG device. + + Parameters: + filename (str): name of the file to save the sessions data to. + """ + sources = ["EEG"] # + ["PPG", "ACC", "GYRO"] + if not duration: + duration = 300 + + # Not sure why we only do this on *nix + # Makes it seem like streaming is only supported on *nix? + if not self.started and sys.platform in ["linux", "linux2", "darwin"]: + # Look for muses + muses = muselsl.list_muses(backend=BACKEND) + # FIXME: fix upstream + muses = [m for m in muses if m["name"].startswith("Muse")] + if not muses: + raise Exception("No Muses found") + + # self.muse = muses[0] + + # Start streaming process + # daemon=False to ensure orderly shutdown/disconnection + stream_process = Process( + target=stream, args=(muses[0]["address"], sources), daemon=False + ) + stream_process.start() + self.stream_process = stream_process + + # Create markers stream outlet + self.marker_outlet = pylsl.StreamOutlet( + pylsl.StreamInfo("Markers", "Markers", 1, 0, "int32", "myuidw43536") + ) + + self.record(sources, duration, filename) + + # FIXME: What's the purpose of this? (Push sample indicating recording start?) + self.push_sample([99], timestamp=time()) + + def record(self, sources: List[str], duration, filename): + # Start a background process that will stream data from the first available Muse + for source in sources: + logger.info("Starting background recording process") + rec_process = Process( + target=record, args=(duration, filename, source), daemon=True + ) + rec_process.start() + + def stop(self): + pass + + def push_sample(self, marker: List[int], timestamp: float): + self.marker_outlet.push_sample(marker, timestamp) + + def _read_buffer(self) -> np.ndarray: + from eegwatch.lslutils import _get_inlets + + inlets = _get_inlets(verbose=False) + + for i in range(5): + for inlet in inlets: + inlet.pull(timeout=0.5) # type: ignore + inlets = [inlet for inlet in inlets if inlet.buffer.any()] # type: ignore + if inlets: + break + else: + logger.info("No inlets with data, trying again in a second...") + sleep(1) + + if not inlets: + raise Exception("No inlets found") + + inlet = inlets[0] + return inlet.buffer # type: ignore + + def check(self) -> List[str]: + checked = _check_samples( + self._read_buffer(), channels=["TP9", "AF7", "AF8", "TP10"] + ) + bads = [ch for ch, ok in checked.items() if not ok] + return bads diff --git a/examples/visual_n170/00x__n170_run_experiment.py b/examples/visual_n170/00x__n170_run_experiment.py index 2258aa08..cc3ff7b5 100644 --- a/examples/visual_n170/00x__n170_run_experiment.py +++ b/examples/visual_n170/00x__n170_run_experiment.py @@ -7,14 +7,13 @@ """ -################################################################################################### +################################################################################################### # Setup -# --------------------- -# +# --------------------- +# # Imports -import os from eegnb import generate_save_fn -from eegnb.devices.eeg import EEG +from eegnb.devices import EEGDevice from eegnb.experiments.visual_n170 import n170 # Define some variables @@ -29,14 +28,14 @@ # --------------------- # # Start EEG device -eeg_device = EEG(device=board_name) +eeg_device = EEGDevice.create(device_name=board_name) # Create save file name save_fn = generate_save_fn(board_name, experiment, subject_id, session_nb) print(save_fn) -################################################################################################### +################################################################################################### # Run experiment -# --------------------- -# +# --------------------- +# n170.present(duration=record_duration, eeg=eeg_device, save_fn=save_fn) diff --git a/examples/visual_p300/00x__p300_run_experiment.py b/examples/visual_p300/00x__p300_run_experiment.py index 8edcdbfd..d9a04740 100644 --- a/examples/visual_p300/00x__p300_run_experiment.py +++ b/examples/visual_p300/00x__p300_run_experiment.py @@ -7,14 +7,13 @@ """ -################################################################################################### +################################################################################################### # Setup -# --------------------- -# +# --------------------- +# # Imports -import os from eegnb import generate_save_fn -from eegnb.devices.eeg import EEG +from eegnb.devices import EEGDevice from eegnb.experiments.visual_p300 import p300 # Define some variables @@ -29,14 +28,14 @@ # --------------------- # # Start EEG device -eeg_device = EEG(device=board_name) +eeg_device = EEGDevice.create(device_name=board_name) # Create save file name save_fn = generate_save_fn(board_name, experiment, subject_id, session_nb) print(save_fn) -################################################################################################### +################################################################################################### # Run experiment -# --------------------- -# +# --------------------- +# p300.present(duration=record_duration, eeg=eeg_device, save_fn=save_fn) diff --git a/examples/visual_ssvep/00x__ssvep_run_experiment.py b/examples/visual_ssvep/00x__ssvep_run_experiment.py index 1741760e..1c632d4b 100644 --- a/examples/visual_ssvep/00x__ssvep_run_experiment.py +++ b/examples/visual_ssvep/00x__ssvep_run_experiment.py @@ -2,19 +2,18 @@ SSVEP run experiment =============================== -This example demonstrates the initiation of an EEG stream with eeg-notebooks, and how to run -an experiment. +This example demonstrates the initiation of an EEG stream with eeg-notebooks, and how to run +an experiment. """ -################################################################################################### +################################################################################################### # Setup -# --------------------- -# +# --------------------- +# # Imports -import os from eegnb import generate_save_fn -from eegnb.devices.eeg import EEG +from eegnb.devices import EEGDevice from eegnb.experiments.visual_ssvep import ssvep # Define some variables @@ -29,14 +28,14 @@ # --------------------- # # Start EEG device -eeg_device = EEG(device=board_name) +eeg_device = EEGDevice.create(device_name=board_name) # Create save file name save_fn = generate_save_fn(board_name, experiment, subject_id, session_nb) print(save_fn) -################################################################################################### +################################################################################################### # Run experiment -# --------------------- -# +# --------------------- +# ssvep.present(duration=record_duration, eeg=eeg_device, save_fn=save_fn) diff --git a/pyproject.toml b/pyproject.toml index de7bf0dc..e3dee674 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ addopts = """ --current-env --ignore-glob 'examples/**.py' --ignore-glob '**/baseline_task.py' + --profile """ testpaths = [ "eegnb", @@ -16,3 +17,12 @@ testpaths = [ #"examples", ] python_files = ["*.py", "*.ipynb"] + +[tool.coverage.report] +exclude_lines = [ + # Have to re-enable the standard pragma + "pragma: no cover", + # Don't complain if tests don't hit defensive assertion code + "raise NotImplementedError" +] +omit = ["eegnb/experiments/*"] diff --git a/requirements.txt b/requirements.txt index c35cafef..25aadb35 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,7 +24,7 @@ wxPython>=4.0 ; platform_system == "Linux" click # can be removed once minimum version is Python 3.7 -dataclasses; python_version == '3.6' +dataclasses; python_version == '3.6' # pywinhook needs some special treatment since there are only wheels on PyPI for Python 3.7-3.8, and building requires special tools (swig, VS C++ tools) # See issue: https://github.com/NeuroTechX/eeg-notebooks/issues/29 @@ -36,9 +36,12 @@ pywinhook @ https://github.com/ActivityWatch/wheels/raw/master/pywinhook/pyWinho pyglet==1.4.10 ; platform_system == "Windows" # Test requirements -mypy +mypy>=0.790 pytest pytest-cov +pytest-profiling +pytest-xvfb +coverage[toml] nbval # Types