|
9 | 9 |
|
10 | 10 | from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts
|
11 | 11 | from kafka.protocol.api import RequestHeader
|
| 12 | +from kafka.protocol.group import HeartbeatResponse |
12 | 13 | from kafka.protocol.metadata import MetadataRequest
|
13 | 14 | from kafka.protocol.produce import ProduceRequest
|
14 | 15 |
|
@@ -360,3 +361,29 @@ def test_requests_timed_out(conn):
|
360 | 361 | # Drop the expired request and we should be good to go again
|
361 | 362 | conn.in_flight_requests.pop(1)
|
362 | 363 | assert not conn.requests_timed_out()
|
| 364 | + |
| 365 | + |
| 366 | +def test_maybe_throttle(conn): |
| 367 | + assert conn.state is ConnectionStates.DISCONNECTED |
| 368 | + assert not conn.throttled() |
| 369 | + |
| 370 | + conn.state = ConnectionStates.CONNECTED |
| 371 | + assert not conn.throttled() |
| 372 | + |
| 373 | + # No throttle_time_ms attribute |
| 374 | + conn._maybe_throttle(HeartbeatResponse[0](error_code=0)) |
| 375 | + assert not conn.throttled() |
| 376 | + |
| 377 | + with mock.patch("time.time", return_value=1000) as time: |
| 378 | + # server-side throttling in v1.0 |
| 379 | + conn.config['api_version'] = (1, 0) |
| 380 | + conn._maybe_throttle(HeartbeatResponse[1](throttle_time_ms=1000, error_code=0)) |
| 381 | + assert not conn.throttled() |
| 382 | + |
| 383 | + # client-side throttling in v2.0 |
| 384 | + conn.config['api_version'] = (2, 0) |
| 385 | + conn._maybe_throttle(HeartbeatResponse[2](throttle_time_ms=1000, error_code=0)) |
| 386 | + assert conn.throttled() |
| 387 | + |
| 388 | + time.return_value = 3000 |
| 389 | + assert not conn.throttled() |
0 commit comments