Skip to content

Commit c00bba8

Browse files
committed
using newer redis_types
1 parent 48f4982 commit c00bba8

File tree

9 files changed

+66
-55
lines changed

9 files changed

+66
-55
lines changed

arq/connections.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from dataclasses import dataclass
55
from datetime import datetime, timedelta
66
from operator import attrgetter
7-
from typing import Any, Callable, List, Optional, Tuple, Union
7+
from typing import Any, Callable, List, Optional, Tuple, Union, TYPE_CHECKING
88
from urllib.parse import urlparse
99
from uuid import uuid4
1010

@@ -67,7 +67,13 @@ def __repr__(self) -> str:
6767
expires_extra_ms = 86_400_000
6868

6969

70-
class ArqRedis(Redis): # type: ignore[misc]
70+
if TYPE_CHECKING:
71+
BaseRedis = Redis[bytes]
72+
else:
73+
BaseRedis = Redis
74+
75+
76+
class ArqRedis(BaseRedis):
7177
"""
7278
Thin subclass of ``redis.asyncio.Redis`` which adds :func:`arq.connections.enqueue_job`.
7379
@@ -147,8 +153,8 @@ async def enqueue_job(
147153

148154
job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer)
149155
pipe.multi()
150-
pipe.psetex(job_key, expires_ms, job)
151-
pipe.zadd(_queue_name, {job_id: score})
156+
pipe.psetex(job_key, expires_ms, job) # type: ignore[no-untyped-call]
157+
pipe.zadd(_queue_name, {job_id: score}) # type: ignore[unused-coroutine]
152158
try:
153159
await pipe.execute()
154160
except WatchError:
@@ -174,7 +180,9 @@ async def all_job_results(self) -> List[JobResult]:
174180
return sorted(results, key=attrgetter('enqueue_time'))
175181

176182
async def _get_job_def(self, job_id: bytes, score: int) -> JobDef:
177-
v = await self.get(job_key_prefix + job_id.decode())
183+
key = job_key_prefix + job_id.decode()
184+
v = await self.get(key)
185+
assert v is not None, f'job "{key}" not found'
178186
jd = deserialize_job(v, deserializer=self.job_deserializer)
179187
jd.score = score
180188
return jd
@@ -209,7 +217,7 @@ async def create_pool(
209217
if settings.sentinel:
210218

211219
def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis:
212-
client = Sentinel(
220+
client = Sentinel( # type: ignore[misc]
213221
*args,
214222
sentinels=settings.host,
215223
ssl=settings.ssl,
@@ -270,12 +278,12 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis:
270278
)
271279

272280

273-
async def log_redis_info(redis: Redis, log_func: Callable[[str], Any]) -> None:
281+
async def log_redis_info(redis: 'Redis[bytes]', log_func: Callable[[str], Any]) -> None:
274282
async with redis.pipeline(transaction=True) as pipe:
275-
pipe.info(section='Server')
276-
pipe.info(section='Memory')
277-
pipe.info(section='Clients')
278-
pipe.dbsize()
283+
pipe.info(section='Server') # type: ignore[unused-coroutine]
284+
pipe.info(section='Memory') # type: ignore[unused-coroutine]
285+
pipe.info(section='Clients') # type: ignore[unused-coroutine]
286+
pipe.dbsize() # type: ignore[unused-coroutine]
279287
info_server, info_memory, info_clients, key_count = await pipe.execute()
280288

281289
redis_version = info_server.get('redis_version', '?')

arq/jobs.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class Job:
6969
def __init__(
7070
self,
7171
job_id: str,
72-
redis: Redis,
72+
redis: 'Redis[bytes]',
7373
_queue_name: str = default_queue_name,
7474
_deserializer: Optional[Deserializer] = None,
7575
):
@@ -118,7 +118,8 @@ async def info(self) -> Optional[JobDef]:
118118
if v:
119119
info = deserialize_job(v, deserializer=self._deserializer)
120120
if info:
121-
info.score = await self._redis.zscore(self._queue_name, self.job_id)
121+
s = await self._redis.zscore(self._queue_name, self.job_id)
122+
info.score = None if s is None else int(s)
122123
return info
123124

124125
async def result_info(self) -> Optional[JobResult]:

arq/worker.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,8 @@ async def _cancel_aborted_jobs(self) -> None:
358358
Go through job_ids in the abort_jobs_ss sorted set and cancel those tasks.
359359
"""
360360
async with self.pool.pipeline(transaction=True) as pipe:
361-
pipe.zrange(abort_jobs_ss, start=0, end=-1)
362-
pipe.zremrangebyscore(abort_jobs_ss, min=timestamp_ms() + abort_job_max_age, max=float('inf'))
361+
pipe.zrange(abort_jobs_ss, start=0, end=-1) # type: ignore[unused-coroutine]
362+
pipe.zremrangebyscore(abort_jobs_ss, min=timestamp_ms() + abort_job_max_age, max=float('inf')) # type: ignore[unused-coroutine]
363363
abort_job_ids, _ = await pipe.execute()
364364

365365
aborted: Set[str] = set()
@@ -396,26 +396,26 @@ async def start_jobs(self, job_ids: List[bytes]) -> None:
396396
continue
397397

398398
pipe.multi()
399-
pipe.psetex(in_progress_key, int(self.in_progress_timeout_s * 1000), b'1')
399+
pipe.psetex(in_progress_key, int(self.in_progress_timeout_s * 1000), b'1') # type: ignore[no-untyped-call]
400400
try:
401401
await pipe.execute()
402402
except (ResponseError, WatchError):
403403
# job already started elsewhere since we got 'existing'
404404
self.sem.release()
405405
logger.debug('multi-exec error, job %s already started elsewhere', job_id)
406406
else:
407-
t = self.loop.create_task(self.run_job(job_id, score))
407+
t = self.loop.create_task(self.run_job(job_id, int(score)))
408408
t.add_done_callback(lambda _: self.sem.release())
409409
self.tasks[job_id] = t
410410

411411
async def run_job(self, job_id: str, score: int) -> None: # noqa: C901
412412
start_ms = timestamp_ms()
413413
async with self.pool.pipeline(transaction=True) as pipe:
414-
pipe.get(job_key_prefix + job_id)
415-
pipe.incr(retry_key_prefix + job_id)
416-
pipe.expire(retry_key_prefix + job_id, 88400)
414+
pipe.get(job_key_prefix + job_id) # type: ignore[unused-coroutine]
415+
pipe.incr(retry_key_prefix + job_id) # type: ignore[unused-coroutine]
416+
pipe.expire(retry_key_prefix + job_id, 88400) # type: ignore[unused-coroutine]
417417
if self.allow_abort_jobs:
418-
pipe.zrem(abort_jobs_ss, job_id)
418+
pipe.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine]
419419
v, job_try, _, abort_job = await pipe.execute()
420420
else:
421421
v, job_try, _ = await pipe.execute()
@@ -622,35 +622,35 @@ async def finish_job(
622622
if keep_in_progress is None:
623623
delete_keys += [in_progress_key]
624624
else:
625-
tr.pexpire(in_progress_key, to_ms(keep_in_progress))
625+
tr.pexpire(in_progress_key, to_ms(keep_in_progress)) # type: ignore[unused-coroutine]
626626

627627
if finish:
628628
if result_data:
629629
expire = None if keep_result_forever else result_timeout_s
630-
tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire))
630+
tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) # type: ignore[unused-coroutine]
631631
delete_keys += [retry_key_prefix + job_id, job_key_prefix + job_id]
632-
tr.zrem(abort_jobs_ss, job_id)
633-
tr.zrem(self.queue_name, job_id)
632+
tr.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine]
633+
tr.zrem(self.queue_name, job_id) # type: ignore[unused-coroutine]
634634
elif incr_score:
635-
tr.zincrby(self.queue_name, incr_score, job_id)
635+
tr.zincrby(self.queue_name, incr_score, job_id) # type: ignore[unused-coroutine]
636636
if delete_keys:
637-
tr.delete(*delete_keys)
637+
tr.delete(*delete_keys) # type: ignore[unused-coroutine]
638638
await tr.execute()
639639

640640
async def finish_failed_job(self, job_id: str, result_data: Optional[bytes]) -> None:
641641
async with self.pool.pipeline(transaction=True) as tr:
642-
tr.delete(
642+
tr.delete( # type: ignore[unused-coroutine]
643643
retry_key_prefix + job_id,
644644
in_progress_key_prefix + job_id,
645645
job_key_prefix + job_id,
646646
)
647-
tr.zrem(abort_jobs_ss, job_id)
648-
tr.zrem(self.queue_name, job_id)
647+
tr.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine]
648+
tr.zrem(self.queue_name, job_id) # type: ignore[unused-coroutine]
649649
# result_data would only be None if serializing the result fails
650650
keep_result = self.keep_result_forever or self.keep_result_s > 0
651651
if result_data is not None and keep_result: # pragma: no branch
652652
expire = 0 if self.keep_result_forever else self.keep_result_s
653-
tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire))
653+
tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) # type: ignore[unused-coroutine]
654654
await tr.execute()
655655

656656
async def heart_beat(self) -> None:
@@ -703,7 +703,7 @@ async def record_health(self) -> None:
703703
f'{datetime.now():%b-%d %H:%M:%S} j_complete={self.jobs_complete} j_failed={self.jobs_failed} '
704704
f'j_retried={self.jobs_retried} j_ongoing={pending_tasks} queued={queued}'
705705
)
706-
await self.pool.psetex(self.health_check_key, int((self.health_check_interval + 1) * 1000), info.encode())
706+
await self.pool.psetex(self.health_check_key, int((self.health_check_interval + 1) * 1000), info.encode()) # type: ignore[no-untyped-call]
707707
log_suffix = info[info.index('j_complete=') :]
708708
if self._last_health_check_log and log_suffix != self._last_health_check_log:
709709
logger.info('recording health: %s', info)

requirements/docs.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ requests==2.28.1
3737
snowballstemmer==2.2.0
3838
# via sphinx
3939
sphinx==5.1.1
40-
# via -r docs/requirements.in
40+
# via -r requirements/docs.in
4141
sphinxcontrib-applehelp==1.0.2
4242
# via sphinx
4343
sphinxcontrib-devhelp==1.0.2

requirements/linting.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ mypy==0.971
66
pycodestyle==2.9.1
77
pyflakes==2.5.0
88
types-pytz==2022.2.1.0
9-
types_redis==4.1.17
9+
types_redis==4.2.8

requirements/linting.txt

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,23 @@
55
# pip-compile --output-file=requirements/linting.txt requirements/linting.in
66
#
77
black==22.6.0
8-
# via -r tests/requirements-linting.in
8+
# via -r requirements/linting.in
99
click==8.1.3
1010
# via black
1111
colorama==0.4.5
1212
# via isort
1313
flake8==5.0.4
1414
# via
15-
# -r tests/requirements-linting.in
15+
# -r requirements/linting.in
1616
# flake8-quotes
1717
flake8-quotes==3.3.1
18-
# via -r tests/requirements-linting.in
18+
# via -r requirements/linting.in
1919
isort[colors]==5.10.1
20-
# via -r tests/requirements-linting.in
20+
# via -r requirements/linting.in
2121
mccabe==0.7.0
2222
# via flake8
2323
mypy==0.971
24-
# via -r tests/requirements-linting.in
24+
# via -r requirements/linting.in
2525
mypy-extensions==0.4.3
2626
# via
2727
# black
@@ -32,20 +32,20 @@ platformdirs==2.5.2
3232
# via black
3333
pycodestyle==2.9.1
3434
# via
35-
# -r tests/requirements-linting.in
35+
# -r requirements/linting.in
3636
# flake8
3737
pyflakes==2.5.0
3838
# via
39-
# -r tests/requirements-linting.in
39+
# -r requirements/linting.in
4040
# flake8
4141
tomli==2.0.1
4242
# via
4343
# black
4444
# mypy
4545
types-pytz==2022.2.1.0
46-
# via -r tests/requirements-linting.in
47-
types-redis==4.1.17
48-
# via -r tests/requirements-linting.in
46+
# via -r requirements/linting.in
47+
types-redis==4.2.8
48+
# via -r requirements/linting.in
4949
typing-extensions==4.3.0
5050
# via
5151
# black

requirements/setup.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@ click==8.1.3
1010
# via arq (setup.py)
1111
deprecated==1.2.13
1212
# via redis
13+
hiredis==2.0.0
14+
# via redis
1315
packaging==21.3
1416
# via redis
1517
pydantic==1.9.2
1618
# via arq (setup.py)
1719
pyparsing==3.0.9
1820
# via packaging
19-
redis==4.3.4
21+
redis[hiredis]==4.2.2
2022
# via arq (setup.py)
2123
typing-extensions==4.3.0
2224
# via

requirements/testing.txt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
attrs==22.1.0
88
# via pytest
99
coverage[toml]==6.4.4
10-
# via -r tests/requirements-testing.in
10+
# via -r requirements/testing.in
1111
dirty-equals==0.4
12-
# via -r tests/requirements-testing.in
12+
# via -r requirements/testing.in
1313
iniconfig==1.1.1
1414
# via pytest
1515
msgpack==1.0.4
16-
# via -r tests/requirements-testing.in
16+
# via -r requirements/testing.in
1717
packaging==21.3
1818
# via
1919
# pytest
@@ -26,22 +26,22 @@ pyparsing==3.0.9
2626
# via packaging
2727
pytest==7.1.2
2828
# via
29-
# -r tests/requirements-testing.in
29+
# -r requirements/testing.in
3030
# pytest-asyncio
3131
# pytest-mock
3232
# pytest-sugar
3333
# pytest-timeout
3434
pytest-asyncio==0.19.0
35-
# via -r tests/requirements-testing.in
35+
# via -r requirements/testing.in
3636
pytest-mock==3.8.2
37-
# via -r tests/requirements-testing.in
37+
# via -r requirements/testing.in
3838
pytest-sugar==0.9.5
39-
# via -r tests/requirements-testing.in
39+
# via -r requirements/testing.in
4040
pytest-timeout==2.1.0
41-
# via -r tests/requirements-testing.in
41+
# via -r requirements/testing.in
4242
pytz==2022.2.1
4343
# via
44-
# -r tests/requirements-testing.in
44+
# -r requirements/testing.in
4545
# dirty-equals
4646
termcolor==1.1.0
4747
# via pytest-sugar

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
arq=arq.cli:cli
5757
""",
5858
install_requires=[
59-
'redis>=4.2.0,<5.0.0',
59+
'redis[hiredis]>=4.2.0,<4.3.0',
6060
'click>=8.0,<9.0',
6161
'pydantic>=1.9.2,<2',
6262
'typing-extensions>=4.1.0,<5.0.0',

0 commit comments

Comments
 (0)