diff --git a/readchar/__init__.py b/readchar/__init__.py index 9f1bac2..5687bb8 100644 --- a/readchar/__init__.py +++ b/readchar/__init__.py @@ -1,7 +1,7 @@ """Library to easily read single chars and key strokes""" -__version__ = "4.0.4-dev0" -__all__ = ["readchar", "readkey", "key", "config"] +__version__ = "4.1.0-dev2" +__all__ = ["readchar", "readkey", "ReadChar", "key", "config"] from sys import platform @@ -10,9 +10,9 @@ if platform.startswith(("linux", "darwin", "freebsd")): from . import _posix_key as key - from ._posix_read import readchar, readkey + from ._posix_read import ReadChar, readchar, readkey elif platform in ("win32", "cygwin"): from . import _win_key as key - from ._win_read import readchar, readkey + from ._win_read import ReadChar, readchar, readkey else: raise NotImplementedError(f"The platform {platform} is not supported yet") diff --git a/readchar/_posix_read.py b/readchar/_posix_read.py index 37f456f..463538f 100644 --- a/readchar/_posix_read.py +++ b/readchar/_posix_read.py @@ -1,9 +1,87 @@ import sys import termios +from copy import copy +from io import StringIO +from select import select from ._config import config +class ReadChar: + """A ContextManager allowing for keypress collection without requiering the user to + confirm presses with ENTER. Can be used non-blocking while inside the context.""" + + def __init__(self, cfg: config = None) -> None: + self.config = cfg if cfg is not None else config + self._buffer = StringIO() + + def __enter__(self) -> "ReadChar": + self.fd = sys.stdin.fileno() + term = termios.tcgetattr(self.fd) + self.old_settings = copy(term) + + term[3] &= ~( + termios.ICANON # don't require ENTER + | termios.ECHO # don't echo + | termios.IGNBRK + | termios.BRKINT + ) + term[6][termios.VMIN] = 0 # imideatly process every input + term[6][termios.VTIME] = 0 + termios.tcsetattr(self.fd, termios.TCSAFLUSH, term) + return self + + def __exit__(self, type, value, traceback) -> None: + termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_settings) + + def __update(self) -> None: + """check stdin and update the interal buffer if it holds data""" + if sys.stdin in select([sys.stdin], [], [], 0)[0]: + pos = self._buffer.tell() + data = sys.stdin.read() + self._buffer.write(data) + self._buffer.seek(pos) + + @property + def key_waiting(self) -> bool: + """True if a key has been pressed and is waiting to be read. False if not.""" + self.__update() + pos = self._buffer.tell() + next_byte = self._buffer.read(1) + self._buffer.seek(pos) + return bool(next_byte) + + def char(self) -> str: + """Reads a singel char from the input stream and returns it as a string of + length one. Does not require the user to press ENTER.""" + self.__update() + return self._buffer.read(1) + + def key(self) -> str: + """Reads a keypress from the input stream and returns it as a string. Keypressed + consisting of multiple characterrs will be read completly and be returned as a + string matching the definitions in `key.py`. + Does not require the user to press ENTER.""" + self.__update() + + c1 = self.char() + if c1 in self.config.INTERRUPT_KEYS: + raise KeyboardInterrupt + if c1 != "\x1B": + return c1 + c2 = self.char() + if c2 not in "\x4F\x5B": + return c1 + c2 + c3 = self.char() + if c3 not in "\x31\x32\x33\x35\x36": + return c1 + c2 + c3 + c4 = self.char() + if c4 not in "\x30\x31\x33\x34\x35\x37\x38\x39": + return c1 + c2 + c3 + c4 + c5 = self.char() + return c1 + c2 + c3 + c4 + c5 + + # Initially taken from: # http://code.activestate.com/recipes/134892/ # Thanks to Danny Yoo diff --git a/readchar/_win_read.py b/readchar/_win_read.py index c3c51c7..bfc8049 100644 --- a/readchar/_win_read.py +++ b/readchar/_win_read.py @@ -1,8 +1,60 @@ import msvcrt +import signal +from . import _win_key as key from ._config import config +class ReadChar: + """A ContextManager allowing for keypress collection without requiering the user to + confirm presses with ENTER. Can be used non-blocking while inside the context.""" + + @staticmethod + def __silent_CTRL_C_callback(signum, frame): + msvcrt.ungetch(key.CTRL_C.encode("ascii")) + + def __init__(self, cfg: config = None) -> None: + self.config = cfg if cfg is not None else config + + def __enter__(self) -> "ReadChar": + self.__org_SIGBREAK_handler = signal.getsignal(signal.SIGBREAK) + self.__org_SIGINT_handler = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGBREAK, signal.default_int_handler) + signal.signal(signal.SIGINT, ReadChar.__silent_CTRL_C_callback) + return self + + def __exit__(self, type, value, traceback) -> None: + signal.signal(signal.SIGBREAK, self.__org_SIGBREAK_handler) + signal.signal(signal.SIGINT, self.__org_SIGINT_handler) + + @property + def key_waiting(self) -> bool: + """True if a key has been pressed and is waiting to be read. False if not.""" + return msvcrt.kbhit() + + def char(self) -> str: + """Reads a singel char from the input stream and returns it as a string of + length one. Does not require the user to press ENTER.""" + return msvcrt.getch().decode("latin1") + + def key(self) -> str: + """Reads a keypress from the input stream and returns it as a string. Keypressed + consisting of multiple characterrs will be read completly and be returned as a + string matching the definitions in `key.py`. + Does not require the user to press ENTER.""" + c = self.char() + + if c in self.config.INTERRUPT_KEYS: + raise KeyboardInterrupt + + # if it is a normal character: + if c not in "\x00\xe0": + return c + + # if it is a special key, read second half: + return "\x00" + self.char() + + def readchar() -> str: """Reads a single character from the input stream. Blocks until a character is available."""