2
2
import random
3
3
import signal
4
4
import socket
5
+ import statistics
5
6
import subprocess
6
7
import sys
7
8
import time
@@ -949,13 +950,55 @@ class StressTest(unittest.TestCase):
949
950
previously tripped signal handlers.
950
951
"""
951
952
953
+ def setsig (self , signum , handler ):
954
+ old_handler = signal .signal (signum , handler )
955
+ self .addCleanup (signal .signal , signum , old_handler )
956
+
957
+ def measure_itimer_resolution (self ):
958
+ N = 20
959
+ times = []
960
+
961
+ def handler (signum = None , frame = None ):
962
+ if len (times ) < N :
963
+ times .append (time .perf_counter ())
964
+ # 1 µs is the smallest possible timer interval,
965
+ # we want to measure what the concrete duration
966
+ # will be on this platform
967
+ signal .setitimer (signal .ITIMER_REAL , 1e-6 )
968
+
969
+ self .addCleanup (signal .setitimer , signal .ITIMER_REAL , 0 )
970
+ self .setsig (signal .SIGALRM , handler )
971
+ handler ()
972
+ while len (times ) < N :
973
+ time .sleep (1e-3 )
974
+
975
+ durations = [times [i + 1 ] - times [i ] for i in range (len (times ) - 1 )]
976
+ med = statistics .median (durations )
977
+ if support .verbose :
978
+ print ("detected median itimer() resolution: %.6f s." % (med ,))
979
+ return med
980
+
981
+ def decide_itimer_count (self ):
982
+ # Some systems have poor setitimer() resolution (for example
983
+ # measured around 20 ms. on FreeBSD 9), so decide on a reasonable
984
+ # number of sequential timers based on that.
985
+ reso = self .measure_itimer_resolution ()
986
+ if reso <= 1e-4 :
987
+ return 10000
988
+ elif reso <= 1e-2 :
989
+ return 100
990
+ else :
991
+ self .skipTest ("detected itimer resolution (%.3f s.) too high "
992
+ "(> 10 ms.) on this platform (or system too busy)"
993
+ % (reso ,))
994
+
952
995
@unittest .skipUnless (hasattr (signal , "setitimer" ),
953
996
"test needs setitimer()" )
954
997
def test_stress_delivery_dependent (self ):
955
998
"""
956
999
This test uses dependent signal handlers.
957
1000
"""
958
- N = 10000
1001
+ N = self . decide_itimer_count ()
959
1002
sigs = []
960
1003
961
1004
def first_handler (signum , frame ):
@@ -969,16 +1012,12 @@ def first_handler(signum, frame):
969
1012
def second_handler (signum = None , frame = None ):
970
1013
sigs .append (signum )
971
1014
972
- def setsig (signum , handler ):
973
- old_handler = signal .signal (signum , handler )
974
- self .addCleanup (signal .signal , signum , old_handler )
975
-
976
1015
# Here on Linux, SIGPROF > SIGALRM > SIGUSR1. By using both
977
1016
# ascending and descending sequences (SIGUSR1 then SIGALRM,
978
1017
# SIGPROF then SIGALRM), we maximize chances of hitting a bug.
979
- setsig (signal .SIGPROF , first_handler )
980
- setsig (signal .SIGUSR1 , first_handler )
981
- setsig (signal .SIGALRM , second_handler ) # for ITIMER_REAL
1018
+ self . setsig (signal .SIGPROF , first_handler )
1019
+ self . setsig (signal .SIGUSR1 , first_handler )
1020
+ self . setsig (signal .SIGALRM , second_handler ) # for ITIMER_REAL
982
1021
983
1022
expected_sigs = 0
984
1023
deadline = time .time () + 15.0
@@ -1005,18 +1044,14 @@ def test_stress_delivery_simultaneous(self):
1005
1044
"""
1006
1045
This test uses simultaneous signal handlers.
1007
1046
"""
1008
- N = 10000
1047
+ N = self . decide_itimer_count ()
1009
1048
sigs = []
1010
1049
1011
1050
def handler (signum , frame ):
1012
1051
sigs .append (signum )
1013
1052
1014
- def setsig (signum , handler ):
1015
- old_handler = signal .signal (signum , handler )
1016
- self .addCleanup (signal .signal , signum , old_handler )
1017
-
1018
- setsig (signal .SIGUSR1 , handler )
1019
- setsig (signal .SIGALRM , handler ) # for ITIMER_REAL
1053
+ self .setsig (signal .SIGUSR1 , handler )
1054
+ self .setsig (signal .SIGALRM , handler ) # for ITIMER_REAL
1020
1055
1021
1056
expected_sigs = 0
1022
1057
deadline = time .time () + 15.0
0 commit comments