Skip to content

bpo-46709: check eval breaker in specialized CALL opcodes #31404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 85 additions & 59 deletions Lib/unittest/test/test_break.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@
import sys
import signal
import weakref

import unittest

from test import support


@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
class TestBreak(unittest.TestCase):
int_handler = None
# This number was smart-guessed, previously tests were failing
# after 7th run. So, we take `x * 2 + 1` to be sure.
default_repeats = 15

def setUp(self):
self._default_handler = signal.getsignal(signal.SIGINT)
Expand All @@ -24,6 +28,27 @@ def tearDown(self):
unittest.signals._interrupt_handler = None


def withRepeats(self, test_function, repeats=None):
if not support.check_impl_detail(cpython=True):
# Override repeats count on non-cpython to execute only once.
# Because this test only makes sense to be repeated on CPython.
repeats = 1
elif repeats is None:
repeats = self.default_repeats

for repeat in range(repeats):
with self.subTest(repeat=repeat):
# We don't run `setUp` for the very first repeat
# and we don't run `tearDown` for the very last one,
# because they are handled by the test class itself.
if repeat != 0:
self.setUp()
try:
test_function()
finally:
if repeat != repeats - 1:
self.tearDown()

def testInstallHandler(self):
default_handler = signal.getsignal(signal.SIGINT)
unittest.installHandler()
Expand All @@ -48,35 +73,34 @@ def testRegisterResult(self):
unittest.removeResult(result)

def testInterruptCaught(self):
default_handler = signal.getsignal(signal.SIGINT)

result = unittest.TestResult()
unittest.installHandler()
unittest.registerResult(result)

self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)

def test(result):
pid = os.getpid()
os.kill(pid, signal.SIGINT)
result.breakCaught = True
self.assertTrue(result.shouldStop)

try:
test(result)
except KeyboardInterrupt:
self.fail("KeyboardInterrupt not handled")
self.assertTrue(result.breakCaught)
def test_function():
result = unittest.TestResult()
unittest.installHandler()
unittest.registerResult(result)

self.assertNotEqual(
signal.getsignal(signal.SIGINT),
self._default_handler,
)

try:
test(result)
except KeyboardInterrupt:
self.fail("KeyboardInterrupt not handled")
self.assertTrue(result.breakCaught)
self.withRepeats(test_function)

def testSecondInterrupt(self):
# Can't use skipIf decorator because the signal handler may have
# been changed after defining this method.
if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
self.skipTest("test requires SIGINT to not be ignored")
result = unittest.TestResult()
unittest.installHandler()
unittest.registerResult(result)

def test(result):
pid = os.getpid()
Expand All @@ -86,64 +110,66 @@ def test(result):
os.kill(pid, signal.SIGINT)
self.fail("Second KeyboardInterrupt not raised")

try:
test(result)
except KeyboardInterrupt:
pass
else:
self.fail("Second KeyboardInterrupt not raised")
self.assertTrue(result.breakCaught)
def test_function():
result = unittest.TestResult()
unittest.installHandler()
unittest.registerResult(result)

with self.assertRaises(KeyboardInterrupt):
test(result)
self.assertTrue(result.breakCaught)
self.withRepeats(test_function)

def testTwoResults(self):
unittest.installHandler()

result = unittest.TestResult()
unittest.registerResult(result)
new_handler = signal.getsignal(signal.SIGINT)
def testTwoResults(self):
def test_function():
unittest.installHandler()

result2 = unittest.TestResult()
unittest.registerResult(result2)
self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
result = unittest.TestResult()
unittest.registerResult(result)
new_handler = signal.getsignal(signal.SIGINT)

result3 = unittest.TestResult()
result2 = unittest.TestResult()
unittest.registerResult(result2)
self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)

def test(result):
pid = os.getpid()
os.kill(pid, signal.SIGINT)
result3 = unittest.TestResult()

try:
test(result)
except KeyboardInterrupt:
self.fail("KeyboardInterrupt not handled")
try:
os.kill(os.getpid(), signal.SIGINT)
except KeyboardInterrupt:
self.fail("KeyboardInterrupt not handled")

self.assertTrue(result.shouldStop)
self.assertTrue(result2.shouldStop)
self.assertFalse(result3.shouldStop)
self.assertTrue(result.shouldStop)
self.assertTrue(result2.shouldStop)
self.assertFalse(result3.shouldStop)
self.withRepeats(test_function)


def testHandlerReplacedButCalled(self):
# Can't use skipIf decorator because the signal handler may have
# been changed after defining this method.
if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
self.skipTest("test requires SIGINT to not be ignored")
# If our handler has been replaced (is no longer installed) but is
# called by the *new* handler, then it isn't safe to delay the
# SIGINT and we should immediately delegate to the default handler
unittest.installHandler()

handler = signal.getsignal(signal.SIGINT)
def new_handler(frame, signum):
handler(frame, signum)
signal.signal(signal.SIGINT, new_handler)

try:
pid = os.getpid()
os.kill(pid, signal.SIGINT)
except KeyboardInterrupt:
pass
else:
self.fail("replaced but delegated handler doesn't raise interrupt")
def test_function():
# If our handler has been replaced (is no longer installed) but is
# called by the *new* handler, then it isn't safe to delay the
# SIGINT and we should immediately delegate to the default handler
unittest.installHandler()

handler = signal.getsignal(signal.SIGINT)
def new_handler(frame, signum):
handler(frame, signum)
signal.signal(signal.SIGINT, new_handler)

try:
os.kill(os.getpid(), signal.SIGINT)
except KeyboardInterrupt:
pass
else:
self.fail("replaced but delegated handler doesn't raise interrupt")
self.withRepeats(test_function)

def testRunner(self):
# Creating a TextTestRunner with the appropriate argument should
Expand Down
9 changes: 9 additions & 0 deletions Python/ceval.c
Original file line number Diff line number Diff line change
Expand Up @@ -4742,6 +4742,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
CHECK_EVAL_BREAKER();
DISPATCH();
}

Expand All @@ -4761,6 +4762,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
CHECK_EVAL_BREAKER();
DISPATCH();
}

Expand All @@ -4785,6 +4787,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
CHECK_EVAL_BREAKER();
DISPATCH();
}

Expand Down Expand Up @@ -4816,6 +4819,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
CHECK_EVAL_BREAKER();
DISPATCH();
}

Expand Down Expand Up @@ -4854,6 +4858,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
*/
goto error;
}
CHECK_EVAL_BREAKER();
DISPATCH();
}

Expand Down Expand Up @@ -4896,6 +4901,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
CHECK_EVAL_BREAKER();
DISPATCH();
}

Expand Down Expand Up @@ -5013,6 +5019,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
CHECK_EVAL_BREAKER();
DISPATCH();
}

Expand Down Expand Up @@ -5040,6 +5047,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
CHECK_EVAL_BREAKER();
DISPATCH();
}

Expand Down Expand Up @@ -5067,6 +5075,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
CHECK_EVAL_BREAKER();
DISPATCH();
}

Expand Down