diff --git a/third_party/2and3/redis/client.pyi b/third_party/2and3/redis/client.pyi index 2e780242cc47..3da396077f7a 100644 --- a/third_party/2and3/redis/client.pyi +++ b/third_party/2and3/redis/client.pyi @@ -1,5 +1,22 @@ from datetime import timedelta -from typing import Any, Callable, Dict, Iterable, Iterator, List, Mapping, Optional, Sequence, Set, Text, Tuple, Union, overload +from typing import ( + Any, + Callable, + Dict, + Generic, + Iterable, + Iterator, + List, + Mapping, + Optional, + Sequence, + Set, + Text, + Tuple, + TypeVar, + Union, + overload, +) from typing_extensions import Literal from .connection import ConnectionPool @@ -38,14 +55,174 @@ def parse_slowlog_get(response, **options): ... _Value = Union[bytes, float, int, Text] _Key = Union[Text, bytes] -class Redis(object): +# Lib returns str or bytes depending on Python version and value of decode_responses +_StrType = TypeVar("_StrType", bound=Union[Text, bytes]) + +class Redis(Generic[_StrType]): RESPONSE_CALLBACKS: Any + @overload + @classmethod + def from_url( + cls, + url: Text, + host: Optional[Text] = ..., + port: Optional[int] = ..., + db: Optional[int] = ..., + password: Optional[Text] = ..., + socket_timeout: Optional[float] = ..., + socket_connect_timeout: Optional[float] = ..., + socket_keepalive: Optional[bool] = ..., + socket_keepalive_options: Optional[Mapping[str, Union[int, str]]] = ..., + connection_pool: Optional[ConnectionPool] = ..., + unix_socket_path: Optional[Text] = ..., + encoding: Text = ..., + encoding_errors: Text = ..., + charset: Optional[Text] = ..., + errors: Optional[Text] = ..., + decode_responses: Optional[bool] = ..., + retry_on_timeout: bool = ..., + ssl: bool = ..., + ssl_keyfile: Optional[Text] = ..., + ssl_certfile: Optional[Text] = ..., + ssl_cert_reqs: Optional[Union[str, int]] = ..., + ssl_ca_certs: Optional[Text] = ..., + ssl_check_hostname: bool = ..., + max_connections: Optional[int] = ..., + single_connection_client: bool = ..., + health_check_interval: float = ..., + client_name: Optional[Text] = ..., + username: Optional[Text] = ..., + ) -> Redis[bytes]: ... + @overload @classmethod - def from_url(cls, url: Text, db: Optional[int] = ..., **kwargs) -> Redis: ... + def from_url( + cls, + url: Text, + host: Optional[Text] = ..., + port: Optional[int] = ..., + db: Optional[int] = ..., + password: Optional[Text] = ..., + socket_timeout: Optional[float] = ..., + socket_connect_timeout: Optional[float] = ..., + socket_keepalive: Optional[bool] = ..., + socket_keepalive_options: Optional[Mapping[str, Union[int, str]]] = ..., + connection_pool: Optional[ConnectionPool] = ..., + unix_socket_path: Optional[Text] = ..., + encoding: Text = ..., + encoding_errors: Text = ..., + charset: Optional[Text] = ..., + decode_responses: Literal[True] = ..., + errors: Optional[Text] = ..., + retry_on_timeout: bool = ..., + ssl: bool = ..., + ssl_keyfile: Optional[Text] = ..., + ssl_certfile: Optional[Text] = ..., + ssl_cert_reqs: Optional[Union[str, int]] = ..., + ssl_ca_certs: Optional[Text] = ..., + ssl_check_hostname: bool = ..., + max_connections: Optional[int] = ..., + single_connection_client: bool = ..., + health_check_interval: float = ..., + client_name: Optional[Text] = ..., + username: Optional[Text] = ..., + ) -> Redis[str]: ... connection_pool: Any response_callbacks: Any + @overload + def __new__( + cls, + host: Text = ..., + port: int = ..., + db: int = ..., + password: Optional[Text] = ..., + socket_timeout: Optional[float] = ..., + socket_connect_timeout: Optional[float] = ..., + socket_keepalive: Optional[bool] = ..., + socket_keepalive_options: Optional[Mapping[str, Union[int, str]]] = ..., + connection_pool: Optional[ConnectionPool] = ..., + unix_socket_path: Optional[Text] = ..., + encoding: Text = ..., + encoding_errors: Text = ..., + charset: Optional[Text] = ..., + errors: Optional[Text] = ..., + retry_on_timeout: bool = ..., + ssl: bool = ..., + ssl_keyfile: Optional[Text] = ..., + ssl_certfile: Optional[Text] = ..., + ssl_cert_reqs: Optional[Union[str, int]] = ..., + ssl_ca_certs: Optional[Text] = ..., + ssl_check_hostname: bool = ..., + max_connections: Optional[int] = ..., + single_connection_client: bool = ..., + health_check_interval: float = ..., + client_name: Optional[Text] = ..., + username: Optional[Text] = ..., + ) -> Redis[bytes]: ... + @overload + def __new__( + cls, + host: Text = ..., + port: int = ..., + db: int = ..., + password: Optional[Text] = ..., + socket_timeout: Optional[float] = ..., + socket_connect_timeout: Optional[float] = ..., + socket_keepalive: Optional[bool] = ..., + socket_keepalive_options: Optional[Mapping[str, Union[int, str]]] = ..., + connection_pool: Optional[ConnectionPool] = ..., + unix_socket_path: Optional[Text] = ..., + encoding: Text = ..., + encoding_errors: Text = ..., + charset: Optional[Text] = ..., + errors: Optional[Text] = ..., + decode_responses: Literal[True] = ..., + retry_on_timeout: bool = ..., + ssl: bool = ..., + ssl_keyfile: Optional[Text] = ..., + ssl_certfile: Optional[Text] = ..., + ssl_cert_reqs: Optional[Union[str, int]] = ..., + ssl_ca_certs: Optional[Text] = ..., + ssl_check_hostname: bool = ..., + max_connections: Optional[int] = ..., + single_connection_client: bool = ..., + health_check_interval: float = ..., + client_name: Optional[Text] = ..., + username: Optional[Text] = ..., + ) -> Redis[str]: ... + @overload def __init__( - self, + self: Redis[str], + host: Text = ..., + port: int = ..., + db: int = ..., + password: Optional[Text] = ..., + socket_timeout: Optional[float] = ..., + socket_connect_timeout: Optional[float] = ..., + socket_keepalive: Optional[bool] = ..., + socket_keepalive_options: Optional[Mapping[str, Union[int, str]]] = ..., + connection_pool: Optional[ConnectionPool] = ..., + unix_socket_path: Optional[Text] = ..., + encoding: Text = ..., + encoding_errors: Text = ..., + charset: Optional[Text] = ..., + errors: Optional[Text] = ..., + decode_responses: Literal[True] = ..., + retry_on_timeout: bool = ..., + ssl: bool = ..., + ssl_keyfile: Optional[Text] = ..., + ssl_certfile: Optional[Text] = ..., + ssl_cert_reqs: Optional[Union[str, int]] = ..., + ssl_ca_certs: Optional[Text] = ..., + ssl_check_hostname: bool = ..., + max_connections: Optional[int] = ..., + single_connection_client: bool = ..., + health_check_interval: float = ..., + client_name: Optional[Text] = ..., + username: Optional[Text] = ..., + ) -> None: ... + @overload + def __init__( + self: Redis[bytes], host: Text = ..., port: int = ..., db: int = ..., @@ -60,7 +237,7 @@ class Redis(object): encoding_errors: Text = ..., charset: Optional[Text] = ..., errors: Optional[Text] = ..., - decode_responses: bool = ..., + decode_responses: Optional[bool] = ..., retry_on_timeout: bool = ..., ssl: bool = ..., ssl_keyfile: Optional[Text] = ..., @@ -153,11 +330,11 @@ class Redis(object): __contains__: Any def expire(self, name: _Key, time: Union[int, timedelta]) -> bool: ... def expireat(self, name, when): ... - def get(self, name: _Key) -> Any: ... # Optional[Union[str, bytes]] depending on decode_responses + def get(self, name: _Key) -> Optional[_StrType]: ... def __getitem__(self, name): ... def getbit(self, name: _Key, offset: int) -> int: ... def getrange(self, key, start, end): ... - def getset(self, name, value): ... + def getset(self, name, value) -> Optional[_StrType]: ... def incr(self, name, amount=...): ... def incrby(self, name, amount=...): ... def incrbyfloat(self, name, amount=...): ... @@ -196,10 +373,10 @@ class Redis(object): def watch(self, *names): ... def unlink(self, *names: _Key) -> int: ... def unwatch(self): ... - def blpop(self, keys: Union[_Value, Iterable[_Value]], timeout: int = ...) -> Optional[Tuple[bytes, bytes]]: ... - def brpop(self, keys: Union[_Value, Iterable[_Value]], timeout: int = ...) -> Optional[Tuple[bytes, bytes]]: ... + def blpop(self, keys: Union[_Value, Iterable[_Value]], timeout: int = ...) -> Optional[Tuple[_StrType, _StrType]]: ... + def brpop(self, keys: Union[_Value, Iterable[_Value]], timeout: int = ...) -> Optional[Tuple[_StrType, _StrType]]: ... def brpoplpush(self, src, dst, timeout=...): ... - def lindex(self, name: _Key, index: int) -> Optional[bytes]: ... + def lindex(self, name: _Key, index: int) -> Optional[_StrType]: ... def linsert( self, name: _Key, where: Literal["BEFORE", "AFTER", "before", "after"], refvalue: _Value, value: _Value ) -> int: ... @@ -207,7 +384,7 @@ class Redis(object): def lpop(self, name): ... def lpush(self, name: _Value, *values: _Value) -> int: ... def lpushx(self, name, value): ... - def lrange(self, name: _Key, start: int, end: int) -> List[bytes]: ... + def lrange(self, name: _Key, start: int, end: int) -> List[_StrType]: ... def lrem(self, name: _Key, count: int, value: _Value) -> int: ... def lset(self, name: _Key, index: int, value: _Value) -> bool: ... def ltrim(self, name: _Key, start: int, end: int) -> bool: ... @@ -227,7 +404,7 @@ class Redis(object): alpha: bool = ..., store: None = ..., groups: bool = ..., - ) -> List[bytes]: ... + ) -> List[_StrType]: ... @overload def sort( self, @@ -255,15 +432,13 @@ class Redis(object): store: _Key, groups: bool = ..., ) -> int: ... - def scan( - self, cursor: int = ..., match: Optional[_Key] = ..., count: Optional[int] = ... - ) -> Tuple[int, List[Any]]: ... # Tuple[int, List[_Key]] depending on decode_responses - def scan_iter( - self, match: Optional[Text] = ..., count: Optional[int] = ... - ) -> Iterator[Any]: ... # Iterator[_Key] depending on decode_responses - def sscan(self, name: _Key, cursor: int = ..., match: Text = ..., count: int = ...) -> Tuple[int, List[bytes]]: ... + def scan(self, cursor: int = ..., match: Optional[_Key] = ..., count: Optional[int] = ...) -> Tuple[int, List[_StrType]]: ... + def scan_iter(self, match: Optional[Text] = ..., count: Optional[int] = ...) -> Iterator[_StrType]: ... + def sscan(self, name: _Key, cursor: int = ..., match: Text = ..., count: int = ...) -> Tuple[int, List[_StrType]]: ... def sscan_iter(self, name, match=..., count=...): ... - def hscan(self, name: _Key, cursor: int = ..., match: Text = ..., count: int = ...) -> Tuple[int, Dict[bytes, bytes]]: ... + def hscan( + self, name: _Key, cursor: int = ..., match: Text = ..., count: int = ... + ) -> Tuple[int, Dict[_StrType, _StrType]]: ... def hscan_iter(self, name, match=..., count=...): ... def zscan(self, name, cursor=..., match=..., count=..., score_cast_func=...): ... def zscan_iter(self, name, match=..., count=..., score_cast_func=...): ... @@ -345,19 +520,19 @@ class Redis(object): def pfmerge(self, dest: _Key, *sources: _Key) -> bool: ... def hdel(self, name: _Key, *keys: _Key) -> int: ... def hexists(self, name: _Key, key: _Key) -> bool: ... - def hget(self, name: _Key, key: _Key) -> Optional[bytes]: ... - def hgetall(self, name: _Key) -> Dict[bytes, bytes]: ... + def hget(self, name: _Key, key: _Key) -> Optional[_StrType]: ... + def hgetall(self, name: _Key) -> Dict[_StrType, _StrType]: ... def hincrby(self, name: _Key, key: _Key, amount: int = ...) -> int: ... def hincrbyfloat(self, name: _Key, key: _Key, amount: float = ...) -> float: ... - def hkeys(self, name: _Key) -> List[bytes]: ... + def hkeys(self, name: _Key) -> List[_StrType]: ... def hlen(self, name: _Key) -> int: ... def hset( self, name: _Key, key: Optional[_Key], value: Optional[_Value], mapping: Optional[Mapping[_Value, _Value]] = ... ) -> int: ... def hsetnx(self, name: _Key, key: _Key, value: _Value) -> int: ... def hmset(self, name: _Key, mapping: Mapping[_Value, _Value]) -> bool: ... - def hmget(self, name: _Key, keys: Union[_Key, Iterable[_Key]], *args: _Key) -> List[Optional[bytes]]: ... - def hvals(self, name: _Key) -> List[bytes]: ... + def hmget(self, name: _Key, keys: Union[_Key, Iterable[_Key]], *args: _Key) -> List[Optional[_StrType]]: ... + def hvals(self, name: _Key) -> List[_StrType]: ... def publish(self, channel: _Key, message: _Key) -> int: ... def eval(self, script, numkeys, *keys_and_args): ... def evalsha(self, sha, numkeys, *keys_and_args): ... @@ -365,7 +540,7 @@ class Redis(object): def script_flush(self): ... def script_kill(self): ... def script_load(self, script): ... - def register_script(self, script: Union[Text, bytes]) -> Script: ... + def register_script(self, script: Union[Text, _StrType]) -> Script: ... def pubsub_channels(self, pattern: _Key = ...) -> List[Text]: ... def pubsub_numsub(self, *args: _Key) -> List[Tuple[Text, int]]: ... def pubsub_numpat(self) -> int: ...