83
83
__version__ = "0.6"
84
84
85
85
__all__ = [
86
- "HTTPServer" , "ThreadingHTTPServer" , "BaseHTTPRequestHandler" ,
87
- "SimpleHTTPRequestHandler" , "CGIHTTPRequestHandler" ,
86
+ "HTTPServer" , "ThreadingHTTPServer" ,
87
+ "HTTPSServer" , "ThreadingHTTPSServer" ,
88
+ "BaseHTTPRequestHandler" , "SimpleHTTPRequestHandler" ,
89
+ "CGIHTTPRequestHandler" ,
88
90
]
89
91
90
92
import copy
@@ -149,6 +151,47 @@ class ThreadingHTTPServer(socketserver.ThreadingMixIn, HTTPServer):
149
151
daemon_threads = True
150
152
151
153
154
+ class HTTPSServer (HTTPServer ):
155
+ def __init__ (self , server_address , RequestHandlerClass ,
156
+ bind_and_activate = True , * , certfile , keyfile = None ,
157
+ password = None , alpn_protocols = None ):
158
+ try :
159
+ import ssl
160
+ except ImportError :
161
+ raise RuntimeError ("SSL module is missing; "
162
+ "HTTPS support is unavailable" )
163
+
164
+ self .ssl = ssl
165
+ self .certfile = certfile
166
+ self .keyfile = keyfile
167
+ self .password = password
168
+ # Support by default HTTP/1.1
169
+ self .alpn_protocols = (
170
+ ["http/1.1" ] if alpn_protocols is None else alpn_protocols
171
+ )
172
+
173
+ super ().__init__ (server_address ,
174
+ RequestHandlerClass ,
175
+ bind_and_activate )
176
+
177
+ def server_activate (self ):
178
+ """Wrap the socket in SSLSocket."""
179
+ super ().server_activate ()
180
+ context = self ._create_context ()
181
+ self .socket = context .wrap_socket (self .socket , server_side = True )
182
+
183
+ def _create_context (self ):
184
+ """Create a secure SSL context."""
185
+ context = self .ssl .create_default_context (self .ssl .Purpose .CLIENT_AUTH )
186
+ context .load_cert_chain (self .certfile , self .keyfile , self .password )
187
+ context .set_alpn_protocols (self .alpn_protocols )
188
+ return context
189
+
190
+
191
+ class ThreadingHTTPSServer (socketserver .ThreadingMixIn , HTTPSServer ):
192
+ daemon_threads = True
193
+
194
+
152
195
class BaseHTTPRequestHandler (socketserver .StreamRequestHandler ):
153
196
154
197
"""HTTP request handler base class.
@@ -1263,20 +1306,29 @@ def _get_best_family(*address):
1263
1306
1264
1307
def test (HandlerClass = BaseHTTPRequestHandler ,
1265
1308
ServerClass = ThreadingHTTPServer ,
1266
- protocol = "HTTP/1.0" , port = 8000 , bind = None ):
1309
+ protocol = "HTTP/1.0" , port = 8000 , bind = None ,
1310
+ tls_cert = None , tls_key = None , tls_password = None ):
1267
1311
"""Test the HTTP request handler class.
1268
1312
1269
1313
This runs an HTTP server on port 8000 (or the port argument).
1270
1314
1271
1315
"""
1272
1316
ServerClass .address_family , addr = _get_best_family (bind , port )
1273
1317
HandlerClass .protocol_version = protocol
1274
- with ServerClass (addr , HandlerClass ) as httpd :
1318
+
1319
+ if tls_cert :
1320
+ server = ThreadingHTTPSServer (addr , HandlerClass , certfile = tls_cert ,
1321
+ keyfile = tls_key , password = tls_password )
1322
+ else :
1323
+ server = ServerClass (addr , HandlerClass )
1324
+
1325
+ with server as httpd :
1275
1326
host , port = httpd .socket .getsockname ()[:2 ]
1276
1327
url_host = f'[{ host } ]' if ':' in host else host
1328
+ protocol = 'HTTPS' if tls_cert else 'HTTP'
1277
1329
print (
1278
- f"Serving HTTP on { host } port { port } "
1279
- f"(http ://{ url_host } :{ port } /) ..."
1330
+ f"Serving { protocol } on { host } port { port } "
1331
+ f"({ protocol . lower () } ://{ url_host } :{ port } /) ..."
1280
1332
)
1281
1333
try :
1282
1334
httpd .serve_forever ()
@@ -1301,10 +1353,31 @@ def test(HandlerClass=BaseHTTPRequestHandler,
1301
1353
default = 'HTTP/1.0' ,
1302
1354
help = 'conform to this HTTP version '
1303
1355
'(default: %(default)s)' )
1356
+ parser .add_argument ('--tls-cert' , metavar = 'PATH' ,
1357
+ help = 'path to the TLS certificate chain file' )
1358
+ parser .add_argument ('--tls-key' , metavar = 'PATH' ,
1359
+ help = 'path to the TLS key file' )
1360
+ parser .add_argument ('--tls-password-file' , metavar = 'PATH' ,
1361
+ help = 'path to the password file for the TLS key' )
1304
1362
parser .add_argument ('port' , default = 8000 , type = int , nargs = '?' ,
1305
1363
help = 'bind to this port '
1306
1364
'(default: %(default)s)' )
1307
1365
args = parser .parse_args ()
1366
+
1367
+ if not args .tls_cert and args .tls_key :
1368
+ parser .error ("--tls-key requires --tls-cert to be set" )
1369
+
1370
+ tls_key_password = None
1371
+ if args .tls_password_file :
1372
+ if not args .tls_cert :
1373
+ parser .error ("--tls-password-file requires --tls-cert to be set" )
1374
+
1375
+ try :
1376
+ with open (args .tls_password_file , "r" , encoding = "utf-8" ) as f :
1377
+ tls_key_password = f .read ().strip ()
1378
+ except OSError as e :
1379
+ parser .error (f"Failed to read TLS password file: { e } " )
1380
+
1308
1381
if args .cgi :
1309
1382
handler_class = CGIHTTPRequestHandler
1310
1383
else :
@@ -1330,4 +1403,7 @@ def finish_request(self, request, client_address):
1330
1403
port = args .port ,
1331
1404
bind = args .bind ,
1332
1405
protocol = args .protocol ,
1406
+ tls_cert = args .tls_cert ,
1407
+ tls_key = args .tls_key ,
1408
+ tls_password = tls_key_password ,
1333
1409
)
0 commit comments