From 3db0eb7eeb00ed010b5df0e183348f879db53a1c Mon Sep 17 00:00:00 2001 From: y5c4l3 Date: Mon, 16 Sep 2024 17:32:30 +0800 Subject: [PATCH 01/10] NEWS for Windows REPL virtual terminal support Signed-off-by: y5c4l3 --- .../Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst diff --git a/Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst b/Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst new file mode 100644 index 00000000000000..423f473a0e5797 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst @@ -0,0 +1,3 @@ +Turn on virtual terminal mode in REPL Windows console to emit sequences that +bracketed-paste modes require. If the terminal may not support paste modes, +provides a flag to help callers turn off related features. From 6d965582d6814c65b31ac6a2959b2ff28e3787b1 Mon Sep 17 00:00:00 2001 From: y5c4l3 Date: Mon, 16 Sep 2024 18:28:29 +0800 Subject: [PATCH 02/10] Add Windows REPL virtual terminal queue To support virtual terminal mode in Windows PYREPL, we need a scanner to read over the supported escaped VT sequences. Signed-off-by: y5c4l3 --- Lib/_pyrepl/windows_eventqueue.py | 119 ++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 Lib/_pyrepl/windows_eventqueue.py diff --git a/Lib/_pyrepl/windows_eventqueue.py b/Lib/_pyrepl/windows_eventqueue.py new file mode 100644 index 00000000000000..cbbfd6be1a94b0 --- /dev/null +++ b/Lib/_pyrepl/windows_eventqueue.py @@ -0,0 +1,119 @@ +""" +Windows event and VT sequence scanner, similar to `unix_eventqueue.py` +""" + +from collections import deque + +from . import keymap +from .console import Event +from .trace import trace +import os + +# Reference: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#input-sequences +VT_MAP: dict[bytes, str] = { + b'\x1b[A': 'up', + b'\x1b[B': 'down', + b'\x1b[C': 'right', + b'\x1b[D': 'left', + b'\x1b[1;5D': 'ctrl left', + b'\x1b[1;5C': 'ctrl right', + + b'\x1b[H': 'home', + b'\x1b[F': 'end', + + b'\x7f': 'backspace', + b'\x1b[2~': 'insert', + b'\x1b[3~': 'delete', + b'\x1b[5~': 'page up', + b'\x1b[6~': 'page down', + + b'\x1bOP': 'f1', + b'\x1bOQ': 'f2', + b'\x1bOR': 'f3', + b'\x1bOS': 'f4', + b'\x1b[15~': 'f5', + b'\x1b[17~]': 'f6', + b'\x1b[18~]': 'f7', + b'\x1b[19~]': 'f8', + b'\x1b[20~]': 'f9', + b'\x1b[21~]': 'f10', + b'\x1b[23~]': 'f11', + b'\x1b[24~]': 'f12', +} + +class EventQueue: + def __init__(self, encoding: str) -> None: + self.compiled_keymap = keymap.compile_keymap(VT_MAP) + self.keymap = self.compiled_keymap + trace("keymap {k!r}", k=self.keymap) + self.encoding = encoding + self.events: deque[Event] = deque() + self.buf = bytearray() + + def get(self) -> Event | None: + """ + Retrieves the next event from the queue. + """ + if self.events: + return self.events.popleft() + else: + return None + + def empty(self) -> bool: + """ + Checks if the queue is empty. + """ + return not self.events + + def flush_buf(self) -> bytearray: + """ + Flushes the buffer and returns its contents. + """ + old = self.buf + self.buf = bytearray() + return old + + def insert(self, event: Event) -> None: + """ + Inserts an event into the queue. + """ + trace('added event {event}', event=event) + self.events.append(event) + + def push(self, char: int | bytes) -> None: + """ + Processes a character by updating the buffer and handling special key mappings. + """ + ord_char = char if isinstance(char, int) else ord(char) + char = bytes(bytearray((ord_char,))) + self.buf.append(ord_char) + if char in self.keymap: + if self.keymap is self.compiled_keymap: + #sanity check, buffer is empty when a special key comes + assert len(self.buf) == 1 + k = self.keymap[char] + trace('found map {k!r}', k=k) + if isinstance(k, dict): + self.keymap = k + else: + self.insert(Event('key', k, self.flush_buf())) + self.keymap = self.compiled_keymap + + elif self.buf and self.buf[0] == 27: # escape + # escape sequence not recognized by our keymap: propagate it + # outside so that i can be recognized as an M-... key (see also + # the docstring in keymap.py + trace('unrecognized escape sequence, propagating...') + self.keymap = self.compiled_keymap + self.insert(Event('key', '\033', bytearray(b'\033'))) + for _c in self.flush_buf()[1:]: + self.push(_c) + + else: + try: + decoded = bytes(self.buf).decode(self.encoding) + except UnicodeError: + return + else: + self.insert(Event('key', decoded, self.flush_buf())) + self.keymap = self.compiled_keymap From 0fed3f7b23966d99f64ceb821cb48f4f368abd24 Mon Sep 17 00:00:00 2001 From: y5c4l3 Date: Mon, 16 Sep 2024 18:29:46 +0800 Subject: [PATCH 03/10] Enable virtual terminal mode in Windows REPL Windows REPL input was using virtual key mode, which does not support terminal escape sequences. This patch calls `SetConsoleMode` properly when initializing and send sequences to enable bracketed-paste modes to support verbatim copy-and-paste. Signed-off-by: y5c4l3 --- Lib/_pyrepl/windows_console.py | 72 ++++++++++++++++++++++++++++++---- 1 file changed, 65 insertions(+), 7 deletions(-) diff --git a/Lib/_pyrepl/windows_console.py b/Lib/_pyrepl/windows_console.py index d457d2b5a338eb..f09a2cca6f1885 100644 --- a/Lib/_pyrepl/windows_console.py +++ b/Lib/_pyrepl/windows_console.py @@ -42,6 +42,7 @@ from .console import Event, Console from .trace import trace from .utils import wlen +from .windows_eventqueue import EventQueue try: from ctypes import GetLastError, WinDLL, windll, WinError # type: ignore[attr-defined] @@ -94,7 +95,9 @@ def __init__(self, err: int | None, descr: str | None = None) -> None: 0x83: "f20", # VK_F20 } -# Console escape codes: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences +# Virtual terminal output sequences +# Reference: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#output-sequences +# Check `windows_eventqueue.py` for input sequences ERASE_IN_LINE = "\x1b[K" MOVE_LEFT = "\x1b[{}D" MOVE_RIGHT = "\x1b[{}C" @@ -106,6 +109,12 @@ def __init__(self, err: int | None, descr: str | None = None) -> None: class _error(Exception): pass +def _supports_vt(): + try: + import nt + return nt._supports_virtual_terminal() + except (ImportError, AttributeError): + return False class WindowsConsole(Console): def __init__( @@ -117,17 +126,36 @@ def __init__( ): super().__init__(f_in, f_out, term, encoding) + self.__vt_support = _supports_vt() + self.__vt_bracketed_paste = False + + if self.__vt_support: + trace('console supports virtual terminal') + + # Should make educated guess to determine the terminal type. + # Currently enable bracketed-paste only if it's Windows Terminal. + if 'WT_SESSION' in os.environ: + trace('console supports bracketed-paste sequence') + self.__vt_bracketed_paste = True + + # Save original console modes so we can recover on cleanup. + original_input_mode = DWORD() + GetConsoleMode(InHandle, original_input_mode) + trace(f'saved original input mode 0x{original_input_mode.value:x}') + self.__original_input_mode = original_input_mode.value + SetConsoleMode( OutHandle, ENABLE_WRAP_AT_EOL_OUTPUT | ENABLE_PROCESSED_OUTPUT | ENABLE_VIRTUAL_TERMINAL_PROCESSING, ) + self.screen: list[str] = [] self.width = 80 self.height = 25 self.__offset = 0 - self.event_queue: deque[Event] = deque() + self.event_queue = EventQueue(encoding) try: self.out = io._WindowsConsoleIO(self.output_fd, "w") # type: ignore[attr-defined] except ValueError: @@ -291,6 +319,12 @@ def _enable_blinking(self): def _disable_blinking(self): self.__write("\x1b[?12l") + def _enable_bracketed_paste(self) -> None: + self.__write("\x1b[?2004h") + + def _disable_bracketed_paste(self) -> None: + self.__write("\x1b[?2004l") + def __write(self, text: str) -> None: if "\x1a" in text: text = ''.join(["^Z" if x == '\x1a' else x for x in text]) @@ -320,8 +354,17 @@ def prepare(self) -> None: self.__gone_tall = 0 self.__offset = 0 + if self.__vt_support: + SetConsoleMode(InHandle, self.__original_input_mode | ENABLE_VIRTUAL_TERMINAL_INPUT) + if self.__vt_bracketed_paste: + self._enable_bracketed_paste() + def restore(self) -> None: - pass + if self.__vt_support: + # Recover to original mode before running REPL + SetConsoleMode(InHandle, self.__original_input_mode) + if self.__vt_bracketed_paste: + self._disable_bracketed_paste() def _move_relative(self, x: int, y: int) -> None: """Moves relative to the current __posxy""" @@ -342,7 +385,7 @@ def move_cursor(self, x: int, y: int) -> None: raise ValueError(f"Bad cursor position {x}, {y}") if y < self.__offset or y >= self.__offset + self.height: - self.event_queue.insert(0, Event("scroll", "")) + self.event_queue.insert(Event("scroll", "")) else: self._move_relative(x, y) self.__posxy = x, y @@ -390,10 +433,8 @@ def get_event(self, block: bool = True) -> Event | None: """Return an Event instance. Returns None if |block| is false and there is no event pending, otherwise waits for the completion of an event.""" - if self.event_queue: - return self.event_queue.pop() - while True: + while self.event_queue.empty(): rec = self._read_input(block) if rec is None: return None @@ -430,8 +471,13 @@ def get_event(self, block: bool = True) -> Event | None: continue return None + elif self.__vt_support: + # If virtual terminal is enabled, scanning VT sequences + self.event_queue.push(rec.Event.KeyEvent.uChar.UnicodeChar) + continue return Event(evt="key", data=key, raw=rec.Event.KeyEvent.uChar.UnicodeChar) + return self.event_queue.get() def push_char(self, char: int | bytes) -> None: """ @@ -553,6 +599,13 @@ class INPUT_RECORD(Structure): MOUSE_EVENT = 0x02 WINDOW_BUFFER_SIZE_EVENT = 0x04 +ENABLE_PROCESSED_INPUT = 0x0001 +ENABLE_LINE_INPUT = 0x0002 +ENABLE_ECHO_INPUT = 0x0004 +ENABLE_MOUSE_INPUT = 0x0010 +ENABLE_INSERT_MODE = 0x0020 +ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200 + ENABLE_PROCESSED_OUTPUT = 0x01 ENABLE_WRAP_AT_EOL_OUTPUT = 0x02 ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x04 @@ -584,6 +637,10 @@ class INPUT_RECORD(Structure): ] ScrollConsoleScreenBuffer.restype = BOOL + GetConsoleMode = _KERNEL32.GetConsoleMode + GetConsoleMode.argtypes = [HANDLE, POINTER(DWORD)] + GetConsoleMode.restype = BOOL + SetConsoleMode = _KERNEL32.SetConsoleMode SetConsoleMode.argtypes = [HANDLE, DWORD] SetConsoleMode.restype = BOOL @@ -610,6 +667,7 @@ def _win_only(*args, **kwargs): GetStdHandle = _win_only GetConsoleScreenBufferInfo = _win_only ScrollConsoleScreenBuffer = _win_only + GetConsoleMode = _win_only SetConsoleMode = _win_only ReadConsoleInput = _win_only GetNumberOfConsoleInputEvents = _win_only From d3a16986778a2aca95cc609813d3b395ff080184 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Thu, 23 Jan 2025 13:08:02 +0100 Subject: [PATCH 04/10] In VM mode, always request bracketed paste --- Lib/_pyrepl/windows_console.py | 13 ++----------- .../2024-09-16-17-03-52.gh-issue-124096.znin0O.rst | 6 +++--- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/Lib/_pyrepl/windows_console.py b/Lib/_pyrepl/windows_console.py index 8b8b10ea0b200b..a275d2398fe958 100644 --- a/Lib/_pyrepl/windows_console.py +++ b/Lib/_pyrepl/windows_console.py @@ -131,17 +131,10 @@ def __init__( super().__init__(f_in, f_out, term, encoding) self.__vt_support = _supports_vt() - self.__vt_bracketed_paste = False if self.__vt_support: trace('console supports virtual terminal') - # Should make educated guess to determine the terminal type. - # Currently enable bracketed-paste only if it's Windows Terminal. - if 'WT_SESSION' in os.environ: - trace('console supports bracketed-paste sequence') - self.__vt_bracketed_paste = True - # Save original console modes so we can recover on cleanup. original_input_mode = DWORD() GetConsoleMode(InHandle, original_input_mode) @@ -360,15 +353,13 @@ def prepare(self) -> None: if self.__vt_support: SetConsoleMode(InHandle, self.__original_input_mode | ENABLE_VIRTUAL_TERMINAL_INPUT) - if self.__vt_bracketed_paste: - self._enable_bracketed_paste() + self._enable_bracketed_paste() def restore(self) -> None: if self.__vt_support: # Recover to original mode before running REPL + self._disable_bracketed_paste() SetConsoleMode(InHandle, self.__original_input_mode) - if self.__vt_bracketed_paste: - self._disable_bracketed_paste() def _move_relative(self, x: int, y: int) -> None: """Moves relative to the current posxy""" diff --git a/Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst b/Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst index 423f473a0e5797..1cf1c8bcf397a8 100644 --- a/Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst +++ b/Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst @@ -1,3 +1,3 @@ -Turn on virtual terminal mode in REPL Windows console to emit sequences that -bracketed-paste modes require. If the terminal may not support paste modes, -provides a flag to help callers turn off related features. +Turn on virtual terminal mode and enable barcketed paste in REPL Windows +console. (If the terminal does not support bracketed paste, enabling it +does nothing.) From 89ba4f28cdca1cd0a86af37b4519623e7a8e4b58 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Thu, 23 Jan 2025 13:36:31 +0100 Subject: [PATCH 05/10] event_queue is no longer a deque --- Lib/_pyrepl/windows_console.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/_pyrepl/windows_console.py b/Lib/_pyrepl/windows_console.py index a275d2398fe958..6d1a2f50dbf462 100644 --- a/Lib/_pyrepl/windows_console.py +++ b/Lib/_pyrepl/windows_console.py @@ -460,7 +460,7 @@ def get_event(self, block: bool = True) -> Event | None: key = f"ctrl {key}" elif key_event.dwControlKeyState & ALT_ACTIVE: # queue the key, return the meta command - self.event_queue.insert(0, Event(evt="key", data=key, raw=key)) + self.event_queue.insert(Event(evt="key", data=key, raw=key)) return Event(evt="key", data="\033") # keymap.py uses this for meta return Event(evt="key", data=key, raw=key) if block: @@ -474,7 +474,7 @@ def get_event(self, block: bool = True) -> Event | None: if key_event.dwControlKeyState & ALT_ACTIVE: # queue the key, return the meta command - self.event_queue.insert(0, Event(evt="key", data=key, raw=raw_key)) + self.event_queue.insert(Event(evt="key", data=key, raw=raw_key)) return Event(evt="key", data="\033") # keymap.py uses this for meta return Event(evt="key", data=key, raw=raw_key) From 290aa5ea8c91f0db5940fe89d28531832d1c714d Mon Sep 17 00:00:00 2001 From: Pablo Galindo Salgado Date: Thu, 23 Jan 2025 22:23:20 +0000 Subject: [PATCH 06/10] Update Lib/_pyrepl/windows_eventqueue.py Co-authored-by: Dustin L. Howett --- Lib/_pyrepl/windows_eventqueue.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Lib/_pyrepl/windows_eventqueue.py b/Lib/_pyrepl/windows_eventqueue.py index cbbfd6be1a94b0..8d6b73c6e7edd3 100644 --- a/Lib/_pyrepl/windows_eventqueue.py +++ b/Lib/_pyrepl/windows_eventqueue.py @@ -32,13 +32,13 @@ b'\x1bOR': 'f3', b'\x1bOS': 'f4', b'\x1b[15~': 'f5', - b'\x1b[17~]': 'f6', - b'\x1b[18~]': 'f7', - b'\x1b[19~]': 'f8', - b'\x1b[20~]': 'f9', - b'\x1b[21~]': 'f10', - b'\x1b[23~]': 'f11', - b'\x1b[24~]': 'f12', + b'\x1b[17~': 'f6', + b'\x1b[18~': 'f7', + b'\x1b[19~': 'f8', + b'\x1b[20~': 'f9', + b'\x1b[21~': 'f10', + b'\x1b[23~': 'f11', + b'\x1b[24~': 'f12', } class EventQueue: From 2b90017527f8b8ae7faf4740614a5a45a82d078e Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Wed, 5 Feb 2025 10:16:35 +0100 Subject: [PATCH 07/10] Apply suggestions from code review Co-authored-by: wheeheee <104880306+wheeheee@users.noreply.github.com> --- Lib/_pyrepl/windows_eventqueue.py | 12 ++++++------ .../2024-09-16-17-03-52.gh-issue-124096.znin0O.rst | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Lib/_pyrepl/windows_eventqueue.py b/Lib/_pyrepl/windows_eventqueue.py index 8d6b73c6e7edd3..f997215b2324b1 100644 --- a/Lib/_pyrepl/windows_eventqueue.py +++ b/Lib/_pyrepl/windows_eventqueue.py @@ -27,11 +27,11 @@ b'\x1b[5~': 'page up', b'\x1b[6~': 'page down', - b'\x1bOP': 'f1', - b'\x1bOQ': 'f2', - b'\x1bOR': 'f3', - b'\x1bOS': 'f4', - b'\x1b[15~': 'f5', + b'\x1bOP': 'f1', + b'\x1bOQ': 'f2', + b'\x1bOR': 'f3', + b'\x1bOS': 'f4', + b'\x1b[15~': 'f5', b'\x1b[17~': 'f6', b'\x1b[18~': 'f7', b'\x1b[19~': 'f8', @@ -89,7 +89,7 @@ def push(self, char: int | bytes) -> None: self.buf.append(ord_char) if char in self.keymap: if self.keymap is self.compiled_keymap: - #sanity check, buffer is empty when a special key comes + # sanity check, buffer is empty when a special key comes assert len(self.buf) == 1 k = self.keymap[char] trace('found map {k!r}', k=k) diff --git a/Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst b/Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst index 1cf1c8bcf397a8..2a6aed98c55374 100644 --- a/Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst +++ b/Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst @@ -1,3 +1,3 @@ -Turn on virtual terminal mode and enable barcketed paste in REPL Windows +Turn on virtual terminal mode and enable bracketed paste in REPL on Windows console. (If the terminal does not support bracketed paste, enabling it does nothing.) From cfc66ebac0ed4c2dc1e4703048b0457de776f333 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Fri, 14 Feb 2025 16:56:55 +0100 Subject: [PATCH 08/10] Deduplicate common EventQueue code; run the tests on both OSes --- Lib/_pyrepl/base_eventqueue.py | 108 ++++++++++++++++++ Lib/_pyrepl/unix_eventqueue.py | 86 +------------- Lib/_pyrepl/windows_eventqueue.py | 85 +------------- ..._unix_eventqueue.py => test_eventqueue.py} | 70 ++++++++---- 4 files changed, 165 insertions(+), 184 deletions(-) create mode 100644 Lib/_pyrepl/base_eventqueue.py rename Lib/test/test_pyrepl/{test_unix_eventqueue.py => test_eventqueue.py} (70%) diff --git a/Lib/_pyrepl/base_eventqueue.py b/Lib/_pyrepl/base_eventqueue.py new file mode 100644 index 00000000000000..3c8289043fa261 --- /dev/null +++ b/Lib/_pyrepl/base_eventqueue.py @@ -0,0 +1,108 @@ +# Copyright 2000-2008 Michael Hudson-Doyle +# Armin Rigo +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +""" +OS-independent base for an event and VT sequence scanner + +See unix_eventqueue and windows_eventqueue for subclasses. +""" + +from collections import deque + +from . import keymap +from .console import Event +from .trace import trace + +class BaseEventQueue: + def __init__(self, encoding: str, keymap_dict=dict[bytes, str]) -> None: + self.compiled_keymap = keymap.compile_keymap(keymap_dict) + self.keymap = self.compiled_keymap + trace("keymap {k!r}", k=self.keymap) + self.encoding = encoding + self.events: deque[Event] = deque() + self.buf = bytearray() + + def get(self) -> Event | None: + """ + Retrieves the next event from the queue. + """ + if self.events: + return self.events.popleft() + else: + return None + + def empty(self) -> bool: + """ + Checks if the queue is empty. + """ + return not self.events + + def flush_buf(self) -> bytearray: + """ + Flushes the buffer and returns its contents. + """ + old = self.buf + self.buf = bytearray() + return old + + def insert(self, event: Event) -> None: + """ + Inserts an event into the queue. + """ + trace('added event {event}', event=event) + self.events.append(event) + + def push(self, char: int | bytes) -> None: + """ + Processes a character by updating the buffer and handling special key mappings. + """ + ord_char = char if isinstance(char, int) else ord(char) + char = bytes(bytearray((ord_char,))) + self.buf.append(ord_char) + if char in self.keymap: + if self.keymap is self.compiled_keymap: + # sanity check, buffer is empty when a special key comes + assert len(self.buf) == 1 + k = self.keymap[char] + trace('found map {k!r}', k=k) + if isinstance(k, dict): + self.keymap = k + else: + self.insert(Event('key', k, self.flush_buf())) + self.keymap = self.compiled_keymap + + elif self.buf and self.buf[0] == 27: # escape + # escape sequence not recognized by our keymap: propagate it + # outside so that i can be recognized as an M-... key (see also + # the docstring in keymap.py + trace('unrecognized escape sequence, propagating...') + self.keymap = self.compiled_keymap + self.insert(Event('key', '\033', bytearray(b'\033'))) + for _c in self.flush_buf()[1:]: + self.push(_c) + + else: + try: + decoded = bytes(self.buf).decode(self.encoding) + except UnicodeError: + return + else: + self.insert(Event('key', decoded, self.flush_buf())) + self.keymap = self.compiled_keymap diff --git a/Lib/_pyrepl/unix_eventqueue.py b/Lib/_pyrepl/unix_eventqueue.py index 70cfade26e23b1..29b3e9dd5efd07 100644 --- a/Lib/_pyrepl/unix_eventqueue.py +++ b/Lib/_pyrepl/unix_eventqueue.py @@ -18,12 +18,9 @@ # CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -from collections import deque - -from . import keymap -from .console import Event from . import curses from .trace import trace +from .base_eventqueue import BaseEventQueue from termios import tcgetattr, VERASE import os @@ -70,83 +67,10 @@ def get_terminal_keycodes() -> dict[bytes, str]: keycodes.update(CTRL_ARROW_KEYCODES) return keycodes -class EventQueue: +class EventQueue(BaseEventQueue): def __init__(self, fd: int, encoding: str) -> None: - self.keycodes = get_terminal_keycodes() + keycodes = get_terminal_keycodes() if os.isatty(fd): backspace = tcgetattr(fd)[6][VERASE] - self.keycodes[backspace] = "backspace" - self.compiled_keymap = keymap.compile_keymap(self.keycodes) - self.keymap = self.compiled_keymap - trace("keymap {k!r}", k=self.keymap) - self.encoding = encoding - self.events: deque[Event] = deque() - self.buf = bytearray() - - def get(self) -> Event | None: - """ - Retrieves the next event from the queue. - """ - if self.events: - return self.events.popleft() - else: - return None - - def empty(self) -> bool: - """ - Checks if the queue is empty. - """ - return not self.events - - def flush_buf(self) -> bytearray: - """ - Flushes the buffer and returns its contents. - """ - old = self.buf - self.buf = bytearray() - return old - - def insert(self, event: Event) -> None: - """ - Inserts an event into the queue. - """ - trace('added event {event}', event=event) - self.events.append(event) - - def push(self, char: int | bytes) -> None: - """ - Processes a character by updating the buffer and handling special key mappings. - """ - ord_char = char if isinstance(char, int) else ord(char) - char = bytes(bytearray((ord_char,))) - self.buf.append(ord_char) - if char in self.keymap: - if self.keymap is self.compiled_keymap: - #sanity check, buffer is empty when a special key comes - assert len(self.buf) == 1 - k = self.keymap[char] - trace('found map {k!r}', k=k) - if isinstance(k, dict): - self.keymap = k - else: - self.insert(Event('key', k, self.flush_buf())) - self.keymap = self.compiled_keymap - - elif self.buf and self.buf[0] == 27: # escape - # escape sequence not recognized by our keymap: propagate it - # outside so that i can be recognized as an M-... key (see also - # the docstring in keymap.py - trace('unrecognized escape sequence, propagating...') - self.keymap = self.compiled_keymap - self.insert(Event('key', '\033', bytearray(b'\033'))) - for _c in self.flush_buf()[1:]: - self.push(_c) - - else: - try: - decoded = bytes(self.buf).decode(self.encoding) - except UnicodeError: - return - else: - self.insert(Event('key', decoded, self.flush_buf())) - self.keymap = self.compiled_keymap + keycodes[backspace] = "backspace" + BaseEventQueue.__init__(self, encoding, keycodes) diff --git a/Lib/_pyrepl/windows_eventqueue.py b/Lib/_pyrepl/windows_eventqueue.py index f997215b2324b1..d99722f9a16a93 100644 --- a/Lib/_pyrepl/windows_eventqueue.py +++ b/Lib/_pyrepl/windows_eventqueue.py @@ -1,13 +1,9 @@ """ -Windows event and VT sequence scanner, similar to `unix_eventqueue.py` +Windows event and VT sequence scanner """ -from collections import deque +from .base_eventqueue import BaseEventQueue -from . import keymap -from .console import Event -from .trace import trace -import os # Reference: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#input-sequences VT_MAP: dict[bytes, str] = { @@ -41,79 +37,6 @@ b'\x1b[24~': 'f12', } -class EventQueue: +class EventQueue(BaseEventQueue): def __init__(self, encoding: str) -> None: - self.compiled_keymap = keymap.compile_keymap(VT_MAP) - self.keymap = self.compiled_keymap - trace("keymap {k!r}", k=self.keymap) - self.encoding = encoding - self.events: deque[Event] = deque() - self.buf = bytearray() - - def get(self) -> Event | None: - """ - Retrieves the next event from the queue. - """ - if self.events: - return self.events.popleft() - else: - return None - - def empty(self) -> bool: - """ - Checks if the queue is empty. - """ - return not self.events - - def flush_buf(self) -> bytearray: - """ - Flushes the buffer and returns its contents. - """ - old = self.buf - self.buf = bytearray() - return old - - def insert(self, event: Event) -> None: - """ - Inserts an event into the queue. - """ - trace('added event {event}', event=event) - self.events.append(event) - - def push(self, char: int | bytes) -> None: - """ - Processes a character by updating the buffer and handling special key mappings. - """ - ord_char = char if isinstance(char, int) else ord(char) - char = bytes(bytearray((ord_char,))) - self.buf.append(ord_char) - if char in self.keymap: - if self.keymap is self.compiled_keymap: - # sanity check, buffer is empty when a special key comes - assert len(self.buf) == 1 - k = self.keymap[char] - trace('found map {k!r}', k=k) - if isinstance(k, dict): - self.keymap = k - else: - self.insert(Event('key', k, self.flush_buf())) - self.keymap = self.compiled_keymap - - elif self.buf and self.buf[0] == 27: # escape - # escape sequence not recognized by our keymap: propagate it - # outside so that i can be recognized as an M-... key (see also - # the docstring in keymap.py - trace('unrecognized escape sequence, propagating...') - self.keymap = self.compiled_keymap - self.insert(Event('key', '\033', bytearray(b'\033'))) - for _c in self.flush_buf()[1:]: - self.push(_c) - - else: - try: - decoded = bytes(self.buf).decode(self.encoding) - except UnicodeError: - return - else: - self.insert(Event('key', decoded, self.flush_buf())) - self.keymap = self.compiled_keymap + BaseEventQueue.__init__(self, encoding, VT_MAP) diff --git a/Lib/test/test_pyrepl/test_unix_eventqueue.py b/Lib/test/test_pyrepl/test_eventqueue.py similarity index 70% rename from Lib/test/test_pyrepl/test_unix_eventqueue.py rename to Lib/test/test_pyrepl/test_eventqueue.py index 301f79927a741f..849fec12e7906f 100644 --- a/Lib/test/test_pyrepl/test_unix_eventqueue.py +++ b/Lib/test/test_pyrepl/test_eventqueue.py @@ -5,67 +5,73 @@ try: from _pyrepl.console import Event - from _pyrepl.unix_eventqueue import EventQueue + from _pyrepl import base_eventqueue except ImportError: pass -@unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows") -@patch("_pyrepl.curses.tigetstr", lambda x: b"") -class TestUnixEventQueue(unittest.TestCase): - def setUp(self): - self.file = tempfile.TemporaryFile() +try: + from _pyrepl import unix_eventqueue +except ImportError: + pass - def tearDown(self) -> None: - self.file.close() +try: + from _pyrepl import windows_eventqueue +except ImportError: + pass + +class EventQueueTestBase: + """OS-independent mixin""" + def make_eventqueue(self) -> base_eventqueue.BaseEventQueue: + raise NotImplementedError() def test_get(self): - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() event = Event("key", "a", b"a") eq.insert(event) self.assertEqual(eq.get(), event) def test_empty(self): - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() self.assertTrue(eq.empty()) eq.insert(Event("key", "a", b"a")) self.assertFalse(eq.empty()) def test_flush_buf(self): - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() eq.buf.extend(b"test") self.assertEqual(eq.flush_buf(), b"test") self.assertEqual(eq.buf, bytearray()) def test_insert(self): - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() event = Event("key", "a", b"a") eq.insert(event) self.assertEqual(eq.events[0], event) - @patch("_pyrepl.unix_eventqueue.keymap") + @patch("_pyrepl.base_eventqueue.keymap") def test_push_with_key_in_keymap(self, mock_keymap): mock_keymap.compile_keymap.return_value = {"a": "b"} - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() eq.keymap = {b"a": "b"} eq.push("a") mock_keymap.compile_keymap.assert_called() self.assertEqual(eq.events[0].evt, "key") self.assertEqual(eq.events[0].data, "b") - @patch("_pyrepl.unix_eventqueue.keymap") + @patch("_pyrepl.base_eventqueue.keymap") def test_push_without_key_in_keymap(self, mock_keymap): mock_keymap.compile_keymap.return_value = {"a": "b"} - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() eq.keymap = {b"c": "d"} eq.push("a") mock_keymap.compile_keymap.assert_called() self.assertEqual(eq.events[0].evt, "key") self.assertEqual(eq.events[0].data, "a") - @patch("_pyrepl.unix_eventqueue.keymap") + @patch("_pyrepl.base_eventqueue.keymap") def test_push_with_keymap_in_keymap(self, mock_keymap): mock_keymap.compile_keymap.return_value = {"a": "b"} - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() eq.keymap = {b"a": {b"b": "c"}} eq.push("a") mock_keymap.compile_keymap.assert_called() @@ -77,10 +83,10 @@ def test_push_with_keymap_in_keymap(self, mock_keymap): self.assertEqual(eq.events[1].evt, "key") self.assertEqual(eq.events[1].data, "d") - @patch("_pyrepl.unix_eventqueue.keymap") + @patch("_pyrepl.base_eventqueue.keymap") def test_push_with_keymap_in_keymap_and_escape(self, mock_keymap): mock_keymap.compile_keymap.return_value = {"a": "b"} - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() eq.keymap = {b"a": {b"b": "c"}} eq.push("a") mock_keymap.compile_keymap.assert_called() @@ -94,7 +100,7 @@ def test_push_with_keymap_in_keymap_and_escape(self, mock_keymap): self.assertEqual(eq.events[1].data, "b") def test_push_special_key(self): - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() eq.keymap = {} eq.push("\x1b") eq.push("[") @@ -103,7 +109,7 @@ def test_push_special_key(self): self.assertEqual(eq.events[0].data, "\x1b") def test_push_unrecognized_escape_sequence(self): - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() eq.keymap = {} eq.push("\x1b") eq.push("[") @@ -115,3 +121,23 @@ def test_push_unrecognized_escape_sequence(self): self.assertEqual(eq.events[1].data, "[") self.assertEqual(eq.events[2].evt, "key") self.assertEqual(eq.events[2].data, "Z") + + +@unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows") +@patch("_pyrepl.curses.tigetstr", lambda x: b"") +class TestUnixEventQueue(EventQueueTestBase, unittest.TestCase): + def setUp(self): + self.file = tempfile.TemporaryFile() + + def tearDown(self) -> None: + self.file.close() + + def make_eventqueue(self) -> base_eventqueue.BaseEventQueue: + return unix_eventqueue.EventQueue(self.file.fileno(), "utf-8") + + +@unittest.skipIf(sys.platform != "win32", "No Windows event queue on Unix") +@patch("_pyrepl.curses.tigetstr", lambda x: b"") +class TestWindowsEventQueue(EventQueueTestBase, unittest.TestCase): + def make_eventqueue(self) -> base_eventqueue.BaseEventQueue: + return windows_eventqueue.EventQueue("utf-8") From 2867297628b6b9d4ce988218d032a9404703abc6 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Fri, 14 Feb 2025 17:30:38 +0100 Subject: [PATCH 09/10] Adjust the test The patch decorator was being called even on the skipped test. --- Lib/test/test_pyrepl/test_eventqueue.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/test/test_pyrepl/test_eventqueue.py b/Lib/test/test_pyrepl/test_eventqueue.py index 849fec12e7906f..a1bac38fbd43f5 100644 --- a/Lib/test/test_pyrepl/test_eventqueue.py +++ b/Lib/test/test_pyrepl/test_eventqueue.py @@ -2,6 +2,7 @@ import unittest import sys from unittest.mock import patch +from test import support try: from _pyrepl.console import Event @@ -123,10 +124,10 @@ def test_push_unrecognized_escape_sequence(self): self.assertEqual(eq.events[2].data, "Z") -@unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows") -@patch("_pyrepl.curses.tigetstr", lambda x: b"") +@unittest.skipIf(support.MS_WINDOWS, "No Unix event queue on Windows") class TestUnixEventQueue(EventQueueTestBase, unittest.TestCase): def setUp(self): + self.enterContext(patch("_pyrepl.curses.tigetstr", lambda x: b"")) self.file = tempfile.TemporaryFile() def tearDown(self) -> None: @@ -136,8 +137,7 @@ def make_eventqueue(self) -> base_eventqueue.BaseEventQueue: return unix_eventqueue.EventQueue(self.file.fileno(), "utf-8") -@unittest.skipIf(sys.platform != "win32", "No Windows event queue on Unix") -@patch("_pyrepl.curses.tigetstr", lambda x: b"") +@unittest.skipUnless(support.MS_WINDOWS, "No Windows event queue on Unix") class TestWindowsEventQueue(EventQueueTestBase, unittest.TestCase): def make_eventqueue(self) -> base_eventqueue.BaseEventQueue: return windows_eventqueue.EventQueue("utf-8") From b15261b3b29260107f1ae5748ec91bd876c98530 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Fri, 14 Feb 2025 17:57:07 +0100 Subject: [PATCH 10/10] Annotation, not default... --- Lib/_pyrepl/base_eventqueue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/_pyrepl/base_eventqueue.py b/Lib/_pyrepl/base_eventqueue.py index 3c8289043fa261..9cae1db112a838 100644 --- a/Lib/_pyrepl/base_eventqueue.py +++ b/Lib/_pyrepl/base_eventqueue.py @@ -31,7 +31,7 @@ from .trace import trace class BaseEventQueue: - def __init__(self, encoding: str, keymap_dict=dict[bytes, str]) -> None: + def __init__(self, encoding: str, keymap_dict: dict[bytes, str]) -> None: self.compiled_keymap = keymap.compile_keymap(keymap_dict) self.keymap = self.compiled_keymap trace("keymap {k!r}", k=self.keymap)