Skip to content

Commit d8c99c0

Browse files
committed
using newer redis_types
1 parent 15866cf commit d8c99c0

File tree

9 files changed

+66
-55
lines changed

9 files changed

+66
-55
lines changed

arq/connections.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from dataclasses import dataclass
66
from datetime import datetime, timedelta
77
from operator import attrgetter
8-
from typing import Any, Callable, Generator, List, Optional, Tuple, Union
8+
from typing import Any, Callable, Generator, List, Optional, Tuple, Union, TYPE_CHECKING
99
from urllib.parse import urlparse
1010
from uuid import uuid4
1111

@@ -73,7 +73,13 @@ def __repr__(self) -> str:
7373
expires_extra_ms = 86_400_000
7474

7575

76-
class ArqRedis(Redis): # type: ignore[misc]
76+
if TYPE_CHECKING:
77+
BaseRedis = Redis[bytes]
78+
else:
79+
BaseRedis = Redis
80+
81+
82+
class ArqRedis(BaseRedis):
7783
"""
7884
Thin subclass of ``redis.asyncio.Redis`` which adds :func:`arq.connections.enqueue_job`.
7985
@@ -153,8 +159,8 @@ async def enqueue_job(
153159

154160
job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer)
155161
pipe.multi()
156-
pipe.psetex(job_key, expires_ms, job)
157-
pipe.zadd(_queue_name, {job_id: score})
162+
pipe.psetex(job_key, expires_ms, job) # type: ignore[no-untyped-call]
163+
pipe.zadd(_queue_name, {job_id: score}) # type: ignore[unused-coroutine]
158164
try:
159165
await pipe.execute()
160166
except WatchError:
@@ -180,7 +186,9 @@ async def all_job_results(self) -> List[JobResult]:
180186
return sorted(results, key=attrgetter('enqueue_time'))
181187

182188
async def _get_job_def(self, job_id: bytes, score: int) -> JobDef:
183-
v = await self.get(job_key_prefix + job_id.decode())
189+
key = job_key_prefix + job_id.decode()
190+
v = await self.get(key)
191+
assert v is not None, f'job "{key}" not found'
184192
jd = deserialize_job(v, deserializer=self.job_deserializer)
185193
jd.score = score
186194
return jd
@@ -215,7 +223,7 @@ async def create_pool(
215223
if settings.sentinel:
216224

217225
def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis:
218-
client = Sentinel(*args, sentinels=settings.host, ssl=settings.ssl, **kwargs)
226+
client = Sentinel(*args, sentinels=settings.host, ssl=settings.ssl, **kwargs) # type: ignore[misc]
219227
return client.master_for(settings.sentinel_master, redis_class=ArqRedis)
220228

221229
else:
@@ -265,12 +273,12 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis:
265273
)
266274

267275

268-
async def log_redis_info(redis: Redis, log_func: Callable[[str], Any]) -> None:
276+
async def log_redis_info(redis: 'Redis[bytes]', log_func: Callable[[str], Any]) -> None:
269277
async with redis.pipeline(transaction=True) as pipe:
270-
pipe.info(section='Server')
271-
pipe.info(section='Memory')
272-
pipe.info(section='Clients')
273-
pipe.dbsize()
278+
pipe.info(section='Server') # type: ignore[unused-coroutine]
279+
pipe.info(section='Memory') # type: ignore[unused-coroutine]
280+
pipe.info(section='Clients') # type: ignore[unused-coroutine]
281+
pipe.dbsize() # type: ignore[unused-coroutine]
274282
info_server, info_memory, info_clients, key_count = await pipe.execute()
275283

276284
redis_version = info_server.get('redis_version', '?')

arq/jobs.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class Job:
6969
def __init__(
7070
self,
7171
job_id: str,
72-
redis: Redis,
72+
redis: 'Redis[bytes]',
7373
_queue_name: str = default_queue_name,
7474
_deserializer: Optional[Deserializer] = None,
7575
):
@@ -118,7 +118,8 @@ async def info(self) -> Optional[JobDef]:
118118
if v:
119119
info = deserialize_job(v, deserializer=self._deserializer)
120120
if info:
121-
info.score = await self._redis.zscore(self._queue_name, self.job_id)
121+
s = await self._redis.zscore(self._queue_name, self.job_id)
122+
info.score = None if s is None else int(s)
122123
return info
123124

124125
async def result_info(self) -> Optional[JobResult]:

arq/worker.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,8 @@ async def _cancel_aborted_jobs(self) -> None:
358358
Go through job_ids in the abort_jobs_ss sorted set and cancel those tasks.
359359
"""
360360
async with self.pool.pipeline(transaction=True) as pipe:
361-
pipe.zrange(abort_jobs_ss, start=0, end=-1)
362-
pipe.zremrangebyscore(abort_jobs_ss, min=timestamp_ms() + abort_job_max_age, max=float('inf'))
361+
pipe.zrange(abort_jobs_ss, start=0, end=-1) # type: ignore[unused-coroutine]
362+
pipe.zremrangebyscore(abort_jobs_ss, min=timestamp_ms() + abort_job_max_age, max=float('inf')) # type: ignore[unused-coroutine]
363363
abort_job_ids, _ = await pipe.execute()
364364

365365
aborted: Set[str] = set()
@@ -396,26 +396,26 @@ async def start_jobs(self, job_ids: List[bytes]) -> None:
396396
continue
397397

398398
pipe.multi()
399-
pipe.psetex(in_progress_key, int(self.in_progress_timeout_s * 1000), b'1')
399+
pipe.psetex(in_progress_key, int(self.in_progress_timeout_s * 1000), b'1') # type: ignore[no-untyped-call]
400400
try:
401401
await pipe.execute()
402402
except (ResponseError, WatchError):
403403
# job already started elsewhere since we got 'existing'
404404
self.sem.release()
405405
logger.debug('multi-exec error, job %s already started elsewhere', job_id)
406406
else:
407-
t = self.loop.create_task(self.run_job(job_id, score))
407+
t = self.loop.create_task(self.run_job(job_id, int(score)))
408408
t.add_done_callback(lambda _: self.sem.release())
409409
self.tasks[job_id] = t
410410

411411
async def run_job(self, job_id: str, score: int) -> None: # noqa: C901
412412
start_ms = timestamp_ms()
413413
async with self.pool.pipeline(transaction=True) as pipe:
414-
pipe.get(job_key_prefix + job_id)
415-
pipe.incr(retry_key_prefix + job_id)
416-
pipe.expire(retry_key_prefix + job_id, 88400)
414+
pipe.get(job_key_prefix + job_id) # type: ignore[unused-coroutine]
415+
pipe.incr(retry_key_prefix + job_id) # type: ignore[unused-coroutine]
416+
pipe.expire(retry_key_prefix + job_id, 88400) # type: ignore[unused-coroutine]
417417
if self.allow_abort_jobs:
418-
pipe.zrem(abort_jobs_ss, job_id)
418+
pipe.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine]
419419
v, job_try, _, abort_job = await pipe.execute()
420420
else:
421421
v, job_try, _ = await pipe.execute()
@@ -622,35 +622,35 @@ async def finish_job(
622622
if keep_in_progress is None:
623623
delete_keys += [in_progress_key]
624624
else:
625-
tr.pexpire(in_progress_key, to_ms(keep_in_progress))
625+
tr.pexpire(in_progress_key, to_ms(keep_in_progress)) # type: ignore[unused-coroutine]
626626

627627
if finish:
628628
if result_data:
629629
expire = None if keep_result_forever else result_timeout_s
630-
tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire))
630+
tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) # type: ignore[unused-coroutine]
631631
delete_keys += [retry_key_prefix + job_id, job_key_prefix + job_id]
632-
tr.zrem(abort_jobs_ss, job_id)
633-
tr.zrem(self.queue_name, job_id)
632+
tr.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine]
633+
tr.zrem(self.queue_name, job_id) # type: ignore[unused-coroutine]
634634
elif incr_score:
635-
tr.zincrby(self.queue_name, incr_score, job_id)
635+
tr.zincrby(self.queue_name, incr_score, job_id) # type: ignore[unused-coroutine]
636636
if delete_keys:
637-
tr.delete(*delete_keys)
637+
tr.delete(*delete_keys) # type: ignore[unused-coroutine]
638638
await tr.execute()
639639

640640
async def finish_failed_job(self, job_id: str, result_data: Optional[bytes]) -> None:
641641
async with self.pool.pipeline(transaction=True) as tr:
642-
tr.delete(
642+
tr.delete( # type: ignore[unused-coroutine]
643643
retry_key_prefix + job_id,
644644
in_progress_key_prefix + job_id,
645645
job_key_prefix + job_id,
646646
)
647-
tr.zrem(abort_jobs_ss, job_id)
648-
tr.zrem(self.queue_name, job_id)
647+
tr.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine]
648+
tr.zrem(self.queue_name, job_id) # type: ignore[unused-coroutine]
649649
# result_data would only be None if serializing the result fails
650650
keep_result = self.keep_result_forever or self.keep_result_s > 0
651651
if result_data is not None and keep_result: # pragma: no branch
652652
expire = 0 if self.keep_result_forever else self.keep_result_s
653-
tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire))
653+
tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) # type: ignore[unused-coroutine]
654654
await tr.execute()
655655

656656
async def heart_beat(self) -> None:
@@ -703,7 +703,7 @@ async def record_health(self) -> None:
703703
f'{datetime.now():%b-%d %H:%M:%S} j_complete={self.jobs_complete} j_failed={self.jobs_failed} '
704704
f'j_retried={self.jobs_retried} j_ongoing={pending_tasks} queued={queued}'
705705
)
706-
await self.pool.psetex(self.health_check_key, int((self.health_check_interval + 1) * 1000), info.encode())
706+
await self.pool.psetex(self.health_check_key, int((self.health_check_interval + 1) * 1000), info.encode()) # type: ignore[no-untyped-call]
707707
log_suffix = info[info.index('j_complete=') :]
708708
if self._last_health_check_log and log_suffix != self._last_health_check_log:
709709
logger.info('recording health: %s', info)

requirements/docs.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ requests==2.28.1
3737
snowballstemmer==2.2.0
3838
# via sphinx
3939
sphinx==5.1.1
40-
# via -r docs/requirements.in
40+
# via -r requirements/docs.in
4141
sphinxcontrib-applehelp==1.0.2
4242
# via sphinx
4343
sphinxcontrib-devhelp==1.0.2

requirements/linting.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ mypy==0.971
66
pycodestyle==2.9.1
77
pyflakes==2.5.0
88
types-pytz==2022.2.1.0
9-
types_redis==4.1.17
9+
types_redis==4.2.8

requirements/linting.txt

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,23 @@
55
# pip-compile --output-file=requirements/linting.txt requirements/linting.in
66
#
77
black==22.6.0
8-
# via -r tests/requirements-linting.in
8+
# via -r requirements/linting.in
99
click==8.1.3
1010
# via black
1111
colorama==0.4.5
1212
# via isort
1313
flake8==5.0.4
1414
# via
15-
# -r tests/requirements-linting.in
15+
# -r requirements/linting.in
1616
# flake8-quotes
1717
flake8-quotes==3.3.1
18-
# via -r tests/requirements-linting.in
18+
# via -r requirements/linting.in
1919
isort[colors]==5.10.1
20-
# via -r tests/requirements-linting.in
20+
# via -r requirements/linting.in
2121
mccabe==0.7.0
2222
# via flake8
2323
mypy==0.971
24-
# via -r tests/requirements-linting.in
24+
# via -r requirements/linting.in
2525
mypy-extensions==0.4.3
2626
# via
2727
# black
@@ -32,20 +32,20 @@ platformdirs==2.5.2
3232
# via black
3333
pycodestyle==2.9.1
3434
# via
35-
# -r tests/requirements-linting.in
35+
# -r requirements/linting.in
3636
# flake8
3737
pyflakes==2.5.0
3838
# via
39-
# -r tests/requirements-linting.in
39+
# -r requirements/linting.in
4040
# flake8
4141
tomli==2.0.1
4242
# via
4343
# black
4444
# mypy
4545
types-pytz==2022.2.1.0
46-
# via -r tests/requirements-linting.in
47-
types-redis==4.1.17
48-
# via -r tests/requirements-linting.in
46+
# via -r requirements/linting.in
47+
types-redis==4.2.8
48+
# via -r requirements/linting.in
4949
typing-extensions==4.3.0
5050
# via
5151
# black

requirements/setup.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@ click==8.1.3
1010
# via arq (setup.py)
1111
deprecated==1.2.13
1212
# via redis
13+
hiredis==2.0.0
14+
# via redis
1315
packaging==21.3
1416
# via redis
1517
pydantic==1.9.2
1618
# via arq (setup.py)
1719
pyparsing==3.0.9
1820
# via packaging
19-
redis==4.3.4
21+
redis[hiredis]==4.2.2
2022
# via arq (setup.py)
2123
typing-extensions==4.3.0
2224
# via

requirements/testing.txt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
attrs==22.1.0
88
# via pytest
99
coverage[toml]==6.4.4
10-
# via -r tests/requirements-testing.in
10+
# via -r requirements/testing.in
1111
dirty-equals==0.4
12-
# via -r tests/requirements-testing.in
12+
# via -r requirements/testing.in
1313
iniconfig==1.1.1
1414
# via pytest
1515
msgpack==1.0.4
16-
# via -r tests/requirements-testing.in
16+
# via -r requirements/testing.in
1717
packaging==21.3
1818
# via
1919
# pytest
@@ -26,22 +26,22 @@ pyparsing==3.0.9
2626
# via packaging
2727
pytest==7.1.2
2828
# via
29-
# -r tests/requirements-testing.in
29+
# -r requirements/testing.in
3030
# pytest-asyncio
3131
# pytest-mock
3232
# pytest-sugar
3333
# pytest-timeout
3434
pytest-asyncio==0.19.0
35-
# via -r tests/requirements-testing.in
35+
# via -r requirements/testing.in
3636
pytest-mock==3.8.2
37-
# via -r tests/requirements-testing.in
37+
# via -r requirements/testing.in
3838
pytest-sugar==0.9.5
39-
# via -r tests/requirements-testing.in
39+
# via -r requirements/testing.in
4040
pytest-timeout==2.1.0
41-
# via -r tests/requirements-testing.in
41+
# via -r requirements/testing.in
4242
pytz==2022.2.1
4343
# via
44-
# -r tests/requirements-testing.in
44+
# -r requirements/testing.in
4545
# dirty-equals
4646
termcolor==1.1.0
4747
# via pytest-sugar

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
arq=arq.cli:cli
5757
""",
5858
install_requires=[
59-
'redis>=4.2.0,<5.0.0',
59+
'redis[hiredis]>=4.2.0,<4.3.0',
6060
'click>=8.0,<9.0',
6161
'pydantic>=1.9.2,<2',
6262
'typing-extensions>=4.1.0,<5.0.0',

0 commit comments

Comments
 (0)