diff --git a/pyrightconfig.stricter.json b/pyrightconfig.stricter.json index c4ecf748ca50..6c44f8a39a9f 100644 --- a/pyrightconfig.stricter.json +++ b/pyrightconfig.stricter.json @@ -60,6 +60,7 @@ "stubs/openpyxl", "stubs/Pillow", "stubs/paramiko", + "stubs/peewee", "stubs/prettytable", "stubs/protobuf", "stubs/pytz/pytz/lazy.pyi", diff --git a/stubs/peewee/METADATA.toml b/stubs/peewee/METADATA.toml new file mode 100644 index 000000000000..719172f4da80 --- /dev/null +++ b/stubs/peewee/METADATA.toml @@ -0,0 +1 @@ +version = "3.14.10.*" diff --git a/stubs/peewee/peewee.pyi b/stubs/peewee/peewee.pyi new file mode 100644 index 000000000000..7425bfe3bf85 --- /dev/null +++ b/stubs/peewee/peewee.pyi @@ -0,0 +1,2135 @@ +import datetime +import decimal +import enum +import threading +import uuid +from _typeshed import Incomplete, Self +from collections.abc import Callable, Container, Hashable, Iterable, Iterator, Mapping, MutableMapping, MutableSet, Sequence +from contextlib import AbstractContextManager +from typing import Any, AnyStr, ClassVar, Generic, NamedTuple, NoReturn, Pattern, Protocol, TypeVar, Union, overload +from typing_extensions import Concatenate, Literal, ParamSpec, TypeAlias + +_T = TypeVar("_T") +_ModelT = TypeVar("_ModelT", bound=Model) +_FuncT = TypeVar("_FuncT", bound=Callable[..., Any]) +_ClassT = TypeVar("_ClassT", bound=type) +_FieldT = TypeVar("_FieldT", bound=Field) +_NodeT = TypeVar("_NodeT", bound=Node) +_P = ParamSpec("_P") + +_ConvFunc: TypeAlias = Callable[[Any], Any] + +__version__: str + +__all__ = [ + "AnyField", + "AsIs", + "AutoField", + "BareField", + "BigAutoField", + "BigBitField", + "BigIntegerField", + "BinaryUUIDField", + "BitField", + "BlobField", + "BooleanField", + "Case", + "Cast", + "CharField", + "Check", + "chunked", + "Column", + "CompositeKey", + "Context", + "Database", + "DatabaseError", + "DatabaseProxy", + "DataError", + "DateField", + "DateTimeField", + "DecimalField", + "DeferredForeignKey", + "DeferredThroughModel", + "DJANGO_MAP", + "DoesNotExist", + "DoubleField", + "DQ", + "EXCLUDED", + "Field", + "FixedCharField", + "FloatField", + "fn", + "ForeignKeyField", + "IdentityField", + "ImproperlyConfigured", + "Index", + "IntegerField", + "IntegrityError", + "InterfaceError", + "InternalError", + "IPField", + "JOIN", + "ManyToManyField", + "Model", + "ModelIndex", + "MySQLDatabase", + "NotSupportedError", + "OP", + "OperationalError", + "PostgresqlDatabase", + "PrimaryKeyField", # XXX: Deprecated, change to AutoField. + "prefetch", + "ProgrammingError", + "Proxy", + "QualifiedNames", + "SchemaManager", + "SmallIntegerField", + "Select", + "SQL", + "SqliteDatabase", + "Table", + "TextField", + "TimeField", + "TimestampField", + "Tuple", + "UUIDField", + "Value", + "ValuesList", + "Window", +] + +AnyField: Incomplete +chunked: Incomplete +Tuple: Incomplete + +class __ICursor(Protocol): + description: tuple[str, Any, Any, Any, Any, Any, Any] + rowcount: int + def fetchone(self) -> tuple[Incomplete, ...] | None: ... + def fetchmany(self, size: int = ...) -> Iterable[tuple[Incomplete, ...]]: ... + def fetchall(self) -> Iterable[tuple[Incomplete, ...]]: ... + +class __IConnection(Protocol): + def cursor(self) -> __ICursor: ... + def execute(self, sql: str, *args: object) -> __ICursor: ... + def commit(self) -> Any: ... + def rollback(self) -> Any: ... + +class __IAggregate(Protocol): + def step(self, *value: object) -> None: ... + def finalize(self) -> Any: ... + +class __ITableFunction(Protocol): + columns: Sequence[str] + params: Sequence[str] + name: str + print_tracebacks: bool + def initialize(self, **parameters: object) -> None: ... + def iterate(self, idx: int) -> tuple[Incomplete, ...]: ... + @classmethod + def register(cls, conn: __IConnection) -> None: ... + +def _sqlite_date_part(lookup_type: str, datetime_string: str) -> str | None: ... +def _sqlite_date_trunc(lookup_type: str, datetime_string: str) -> str | None: ... + +class attrdict(dict[str, object]): + def __getattr__(self, attr: str) -> Any: ... + def __setattr__(self, attr: str, value: object) -> None: ... + def __iadd__(self: Self, rhs: Mapping[str, object]) -> Self: ... + def __add__(self, rhs: Mapping[str, object]) -> Mapping[str, object]: ... + +class _TSentinel(enum.Enum): ... + +# HACK (dargueta): This is a regular object but we need it to annotate the sentinel in +# type arguments. +SENTINEL: _TSentinel + +OP: attrdict + +DJANGO_MAP: attrdict + +FIELD: attrdict + +JOIN: attrdict + +ROW: attrdict + +SCOPE_NORMAL: int +SCOPE_SOURCE: int +SCOPE_VALUES: int +SCOPE_CTE: int +SCOPE_COLUMN: int + +CSQ_PARENTHESES_NEVER: int +CSQ_PARENTHESES_ALWAYS: int +CSQ_PARENTHESES_UNNESTED: int + +SNAKE_CASE_STEP1: Pattern[str] +SNAKE_CASE_STEP2: Pattern[str] + +MODEL_BASE: str + +# TODO (dargueta) +class _callable_context_manager: + def __call__(self, fn: _FuncT) -> _FuncT: ... + +class Proxy: + obj: Any + def initialize(self, obj: object) -> None: ... + def attach_callback(self, callback: _ConvFunc) -> _ConvFunc: ... + @staticmethod # This is technically inaccurate but that's how it's used + def passthrough(method: _FuncT) -> _FuncT: ... + def __enter__(self) -> Any: ... + def __exit__(self, exc_type, exc_val, exc_tb) -> Any: ... + def __getattr__(self, attr: str) -> Any: ... + def __setattr__(self, attr: str, value: object) -> None: ... + +class DatabaseProxy(Proxy): + def connection_context(self) -> ConnectionContext: ... + def atomic(self, *args: object, **kwargs: object) -> _atomic: ... + def manual_commit(self) -> _manual: ... + def transaction(self, *args: object, **kwargs: object) -> _transaction: ... + def savepoint(self) -> _savepoint: ... + +class ModelDescriptor: ... + +# SQL Generation. + +class AliasManager: + @property + def mapping(self) -> MutableMapping[Source, str]: ... + def add(self, source: Source) -> str: ... + def get(self, source: Source, any_depth: bool = ...) -> str: ... + def __getitem__(self, source: Source) -> str: ... + def __setitem__(self, source: Source, alias: str) -> None: ... + def push(self) -> None: ... + def pop(self) -> None: ... + +class __State(NamedTuple): + scope: int + parentheses: bool + # From the source code we know this to be a dict and not just a MutableMapping. + settings: dict[str, Any] + +class State(__State): + subquery: object # TODO (dargueta) + def __new__(cls: type[Self], scope: int = ..., parentheses: bool = ..., **kwargs: object) -> Self: ... + def __call__(self, scope: int | None = ..., parentheses: int | None = ..., **kwargs: object) -> State: ... + def __getattr__(self, attr_name: str) -> Any: ... + +class Context: + stack: list[State] + alias_manager: AliasManager + state: State + def __init__(self, **settings: object) -> None: ... + def as_new(self) -> Context: ... + def column_sort_key(self, item: Sequence[ColumnBase | Source]) -> tuple[str, ...]: ... + @property + def scope(self) -> int: ... + @property + def parentheses(self) -> bool: ... + @property + def subquery(self) -> Any: ... # TODO (dargueta): Figure out type of "self.state.subquery" + def __call__(self: Self, **overrides: object) -> Self: ... + def scope_normal(self) -> AbstractContextManager[Context]: ... + def scope_source(self) -> AbstractContextManager[Context]: ... + def scope_values(self) -> AbstractContextManager[Context]: ... + def scope_cte(self) -> AbstractContextManager[Context]: ... + def scope_column(self) -> AbstractContextManager[Context]: ... + def __enter__(self: Self) -> Self: ... + def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None: ... + # @contextmanager + def push_alias(self) -> Iterator[None]: ... + # TODO (dargueta): Is this right? + def sql(self, obj: object) -> Context: ... + def literal(self: Self, keyword: str) -> Self: ... + def value(self, value: object, converter: _ConvFunc | None = ..., add_param: bool = ...) -> Context: ... + def __sql__(self, ctx: Context) -> Context: ... + def parse(self, node: Node) -> tuple[str, tuple[Incomplete, ...] | None]: ... + def query(self) -> tuple[str, tuple[Incomplete, ...] | None]: ... + +def query_to_string(query: Node) -> str: ... + +class Node: + def clone(self) -> Node: ... + def __sql__(self, ctx: Context) -> Context: ... + # FIXME (dargueta): Is there a way to make this a proper decorator? + @staticmethod + def copy(method: Callable[_P, Any]) -> Callable[Concatenate[_T, _P], _T]: ... + def coerce(self: Self, _coerce: bool = ...) -> Self: ... + def is_alias(self) -> bool: ... + def unwrap(self: Self) -> Self: ... + +class ColumnFactory: + node: Node + def __init__(self, node: Node): ... + def __getattr__(self, attr: str) -> Column: ... + +class _DynamicColumn: + @overload + def __get__(self, instance: None, instance_type: type) -> _DynamicColumn: ... + @overload + def __get__(self, instance: _T, instance_type: type[_T]) -> ColumnFactory: ... + +class _ExplicitColumn: + @overload + def __get__(self, instance: None, instance_type: type) -> _ExplicitColumn: ... + @overload + def __get__(self, instance: _T, instance_type: type[_T]) -> NoReturn: ... + +class _SupportsAlias(Protocol): + def alias(self: Self, name: str) -> Self: ... + +class Source(_SupportsAlias, Node): + c: ClassVar[_DynamicColumn] + def __init__(self, alias: str | None = ...): ... + def select(self, *columns: Field) -> Select: ... + def join(self, dest, join_type: int = ..., on: Expression | None = ...) -> Join: ... + def left_outer_join(self, dest, on: Expression | None = ...) -> Join: ... + def cte(self, name: str, recursive: bool = ..., columns=..., materialized=...) -> CTE: ... # incomplete + def get_sort_key(self, ctx) -> tuple[str, ...]: ... + def apply_alias(self, ctx: Context) -> Context: ... + def apply_column(self, ctx: Context) -> Context: ... + +class _HashableSource(_SupportsAlias): + def __init__(self, *args: object, **kwargs: object): ... + def __hash__(self) -> int: ... + # The overrides here are unfortunately a necessary evil. The __eq__/__ne__ methods + # return different types depending on the type of the argument, and both differ + # from `object`'s signature. + @overload # type: ignore + def __eq__(self, other: _HashableSource) -> bool: ... # type: ignore + @overload + def __eq__(self, other: object) -> Expression: ... # type: ignore + @overload # type: ignore + def __ne__(self, other: _HashableSource) -> bool: ... # type: ignore + @overload + def __ne__(self, other: object) -> Expression: ... # type: ignore + def __lt__(self, other: object) -> Expression: ... + def __le__(self, other: object) -> Expression: ... + def __gt__(self, other: object) -> Expression: ... + def __ge__(self, other: object) -> Expression: ... + +def __join__(join_type: int = ..., inverted: bool = ...) -> Callable[[Any, Any], Join]: ... + +class BaseTable(Source): + def __and__(self, other: object) -> Join: ... + def __add__(self, other: object) -> Join: ... + def __sub__(self, other: object) -> Join: ... + def __or__(self, other: object) -> Join: ... + def __mul__(self, other: object) -> Join: ... + def __rand__(self, other: object) -> Join: ... + def __radd__(self, other: object) -> Join: ... + def __rsub__(self, other: object) -> Join: ... + def __ror__(self, other: object) -> Join: ... + def __rmul__(self, other: object) -> Join: ... + +class _BoundTableContext(_callable_context_manager): + table: Table + database: Database + def __init__(self, table: Table, database: Database): ... + def __enter__(self) -> Table: ... + def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None: ... + +class Table(_HashableSource, BaseTable): + __name__: str + # mypy doesn't seem to understand very well how descriptors work + c: _ExplicitColumn # type: ignore[misc, assignment] + primary_key: Field | CompositeKey | None + def __init__( + self, + name: str, + columns: Iterable[str] | None = ..., + primary_key: Field | CompositeKey | None = ..., + schema: str | None = ..., + alias: str | None = ..., + _model: type[Model] | None = ..., + _database: Database | None = ..., + ): ... + def clone(self: Self) -> Self: ... + def bind(self: Self, database: Database | None = ...) -> Self: ... + def bind_ctx(self, database: Database | None = ...) -> _BoundTableContext: ... + def select(self, *columns: Column) -> Select: ... + @overload + def insert(self, insert: Select | None, columns: Sequence[str | Field | Column]) -> Insert: ... + @overload + def insert(self, insert: Mapping[str, object] | Iterable[Mapping[str, object]], **kwargs: object): ... + @overload + def replace(self, insert: Select | None, columns: Sequence[str | Field | Column]) -> Insert: ... + @overload + def replace(self, insert: Mapping[str, object] | Iterable[Mapping[str, object]], **kwargs: object): ... + def update(self, update: Mapping[str, object] | None = ..., **kwargs: object) -> Update: ... + def delete(self) -> Delete: ... + def __sql__(self, ctx: Context) -> Context: ... + +class Join(BaseTable): + lhs: Any # TODO (dargueta) + rhs: Any # TODO (dargueta) + join_type: int + def __init__(self, lhs, rhs, join_type: int = ..., on: Expression | None = ..., alias: str | None = ...): ... + def on(self: Self, predicate: Expression) -> Self: ... + def __sql__(self, ctx: Context) -> Context: ... + +class ValuesList(_HashableSource, BaseTable): + def __init__(self, values, columns=..., alias: str | None = ...): ... # incomplete + # FIXME (dargueta) `names` might be wrong + def columns(self: Self, *names: str) -> Self: ... + def __sql__(self, ctx: Context) -> Context: ... + +class CTE(_HashableSource, Source): + def __init__( + self, + name: str, + query: Select, + recursive: bool = ..., + columns: Iterable[Column | Field | str] | None = ..., + materialized: bool = ..., + ): ... + # TODO (dargueta): Is `columns` just for column names? + def select_from(self, *columns: Column | Field) -> Select: ... + def _get_hash(self) -> int: ... + def union_all(self, rhs) -> CTE: ... + __add__ = union_all + def union(self, rhs: SelectQuery) -> CTE: ... + __or__ = union + def __sql__(self, ctx: Context) -> Context: ... + +class ColumnBase(Node): + _converter: _ConvFunc | None + def converter(self: Self, converter: _ConvFunc | None = ...) -> Self: ... + @overload + def alias(self: Self, alias: None) -> Self: ... + @overload + def alias(self, alias: str) -> Alias: ... + def unalias(self: Self) -> Self: ... + def cast(self, as_type: str) -> Cast: ... + def asc(self, collation: str | None = ..., nulls: str | None = ...) -> _SupportsSQLOrdering: ... + __pos__ = asc + def desc(self, collation: str | None = ..., nulls: str | None = ...) -> _SupportsSQLOrdering: ... + __neg__ = desc + # TODO (dargueta): This always returns Negated but subclasses can return something else + def __invert__(self) -> WrappedNode[Incomplete]: ... + def __and__(self, other: object) -> Expression: ... + def __or__(self, other: object) -> Expression: ... + def __add__(self, other: object) -> Expression: ... + def __sub__(self, other: object) -> Expression: ... + def __mul__(self, other: object) -> Expression: ... + def __div__(self, other: object) -> Expression: ... + def __truediv__(self, other: object) -> Expression: ... + def __xor__(self, other: object) -> Expression: ... + def __radd__(self, other: object) -> Expression: ... + def __rsub__(self, other: object) -> Expression: ... + def __rmul__(self, other: object) -> Expression: ... + def __rdiv__(self, other: object) -> Expression: ... + def __rtruediv__(self, other: object) -> Expression: ... + def __rand__(self, other: object) -> Expression: ... + def __ror__(self, other: object) -> Expression: ... + def __rxor__(self, other: object) -> Expression: ... + def __eq__(self, rhs: object) -> Expression: ... # type: ignore[override] + def __ne__(self, rhs: object) -> Expression: ... # type: ignore[override] + def __lt__(self, other: object) -> Expression: ... + def __le__(self, other: object) -> Expression: ... + def __gt__(self, other: object) -> Expression: ... + def __ge__(self, other: object) -> Expression: ... + def __lshift__(self, other: object) -> Expression: ... + def __rshift__(self, other: object) -> Expression: ... + def __mod__(self, other: object) -> Expression: ... + def __pow__(self, other: object) -> Expression: ... + def bin_and(self, other: object) -> Expression: ... + def bin_or(self, other: object) -> Expression: ... + def in_(self, other: object) -> Expression: ... + def not_in(self, other: object) -> Expression: ... + def regexp(self, other: object) -> Expression: ... + def is_null(self, is_null: bool = ...) -> Expression: ... + def contains(self, rhs: Node | str) -> Expression: ... + def startswith(self, rhs: Node | str) -> Expression: ... + def endswith(self, rhs: Node | str) -> Expression: ... + def between(self, lo: object, hi: object) -> Expression: ... + def concat(self, rhs: object) -> StringExpression: ... + def iregexp(self, rhs: object) -> Expression: ... + def __getitem__(self, item: object) -> Expression: ... + def distinct(self) -> NodeList: ... + def collate(self, collation: str) -> NodeList: ... + def get_sort_key(self, ctx: Context) -> tuple[str, ...]: ... + +class Column(ColumnBase): + source: Source + name: str + def __init__(self, source: Source, name: str): ... + def get_sort_key(self, ctx: Context) -> tuple[str, ...]: ... + def __hash__(self) -> int: ... + def __sql__(self, ctx: Context) -> Context: ... + +class WrappedNode(ColumnBase, Generic[_NodeT]): + node: _NodeT + _coerce: bool + _converter: _ConvFunc | None + def __init__(self, node: _NodeT): ... + def is_alias(self) -> bool: ... + def unwrap(self) -> _NodeT: ... + +class EntityFactory: + node: Node + def __init__(self, node: Node): ... + def __getattr__(self, attr: str) -> Entity: ... + +class _DynamicEntity: + @overload + def __get__(self, instance: None, instance_type: type) -> _DynamicEntity: ... + @overload + def __get__(self, instance: _T, instance_type: type[_T]) -> EntityFactory: ... + +class Alias(WrappedNode[Incomplete]): + c: ClassVar[_DynamicEntity] + def __init__(self, node: Node, alias: str): ... + def __hash__(self) -> int: ... + @overload + def alias(self, alias: None) -> Node: ... + @overload + def alias(self, alias: str) -> Alias: ... + def unalias(self) -> Node: ... + def is_alias(self) -> Literal[True]: ... + def __sql__(self, ctx: Context) -> Context: ... + +class Negated(WrappedNode[Incomplete]): + def __invert__(self) -> Node: ... + def __sql__(self, ctx: Context) -> Context: ... + +class BitwiseMixin: + def __and__(self, other: object) -> Expression: ... + def __or__(self, other: object) -> Expression: ... + def __sub__(self, other: object) -> Expression: ... + def __invert__(self) -> BitwiseNegated: ... + +class BitwiseNegated(BitwiseMixin, WrappedNode[Incomplete]): + def __invert__(self) -> Node: ... + def __sql__(self, ctx: Context) -> Context: ... + +class Value(ColumnBase): + value: object + converter: _ConvFunc | None + multi: bool + def __init__(self, value: object, converter: _ConvFunc | None = ..., unpack: bool = ...): ... + def __sql__(self, ctx: Context) -> Context: ... + +def AsIs(value: object) -> Value: ... + +class Cast(WrappedNode[Incomplete]): + def __init__(self, node: Node, cast: str): ... + def __sql__(self, ctx: Context) -> Context: ... + +class Ordering(WrappedNode[Incomplete]): + direction: str + collation: str | None + nulls: str | None + def __init__(self, node: Node, direction: str, collation: str | None = ..., nulls: str | None = ...): ... + def collate(self, collation: str | None = ...) -> Ordering: ... # type: ignore[override] + def __sql__(self, ctx: Context) -> Context: ... + +class _SupportsSQLOrdering(Protocol): + def __call__(self, node: Node, collation: str | None = ..., nulls: str | None = ...) -> Ordering: ... + +def Asc(node: Node, collation: str | None = ..., nulls: str | None = ...) -> Ordering: ... +def Desc(node: Node, collation: str | None = ..., nulls: str | None = ...) -> Ordering: ... + +class Expression(ColumnBase): + lhs: Node | str | None + op: int + rhs: Node | str | None + flat: bool + def __init__(self, lhs: Node | str | None, op: int, rhs: Node | str | None, flat: bool = ...): ... + def __sql__(self, ctx: Context) -> Context: ... + +class StringExpression(Expression): + def __add__(self, rhs: object) -> StringExpression: ... + def __radd__(self, lhs: object) -> StringExpression: ... + +class Entity(ColumnBase): + def __init__(self, *path: str): ... + def __getattr__(self, attr: str) -> Entity: ... + def get_sort_key(self, ctx: Context) -> tuple[str, ...]: ... + def __hash__(self) -> int: ... + def __sql__(self, ctx: Context) -> Context: ... + +class SQL(ColumnBase): + sql: str + params: Mapping[str, object] | None + def __init__(self, sql: str, params: Mapping[str, object] = ...): ... + def __sql__(self, ctx: Context) -> Context: ... + +def Check(constraint: str) -> SQL: ... + +class Function(ColumnBase): + name: str + arguments: tuple[Incomplete, ...] + def __init__( + self, name: str, arguments: tuple[Incomplete, ...], coerce: bool = ..., python_value: _ConvFunc | None = ... + ): ... + def __getattr__(self, attr: str) -> Callable[..., Function]: ... + # TODO (dargueta): `where` is an educated guess + def filter(self: Self, where: Expression | None = ...) -> Self: ... + def order_by(self: Self, *ordering: Field | Expression) -> Self: ... + def python_value(self: Self, func: _ConvFunc | None = ...) -> Self: ... + def over( + self, + partition_by: Sequence[Field] | Window | None = ..., + order_by: Sequence[Field | Expression] | None = ..., + start: str | SQL | None = ..., + end: str | SQL | None = ..., + frame_type: str | None = ..., + window: Window | None = ..., + exclude: SQL | None = ..., + ) -> NodeList: ... + def __sql__(self, ctx: Context) -> Context: ... + +fn: Function + +class Window(Node): + CURRENT_ROW: ClassVar[SQL] + GROUP: ClassVar[SQL] + TIES: ClassVar[SQL] + NO_OTHERS: ClassVar[SQL] + GROUPS: ClassVar[str] + RANGE: ClassVar[str] + ROWS: ClassVar[str] + # Instance variables + partition_by: tuple[Field | Expression, ...] + order_by: tuple[Field | Expression, ...] + start: str | SQL | None + end: str | SQL | None + frame_type: Incomplete | None + @overload + def __init__( + self, + partition_by: Sequence[Field] | Window | None = ..., + order_by: Sequence[Field | Expression] | None = ..., + start: str | SQL | None = ..., + end: None = ..., + frame_type: str | None = ..., + extends: Window | WindowAlias | str | None = ..., + exclude: SQL | None = ..., + alias: str | None = ..., + _inline: bool = ..., + ): ... + @overload + def __init__( + self, + partition_by: Sequence[Field] | Window | None = ..., + order_by: Sequence[Field | Expression] | None = ..., + start: str | SQL = ..., + end: str | SQL = ..., + frame_type: str | None = ..., + extends: Window | WindowAlias | str | None = ..., + exclude: SQL | None = ..., + alias: str | None = ..., + _inline: bool = ..., + ): ... + def alias(self: Self, alias: str | None = ...) -> Self: ... + def as_range(self: Self) -> Self: ... + def as_rows(self: Self) -> Self: ... + def as_groups(self: Self) -> Self: ... + def extends(self: Self, window: Window | WindowAlias | str | None = ...) -> Self: ... + def exclude(self: Self, frame_exclusion: str | SQL | None = ...) -> Self: ... + @staticmethod + def following(value: int | None = ...) -> SQL: ... + @staticmethod + def preceding(value: int | None = ...) -> SQL: ... + def __sql__(self, ctx: Context) -> Context: ... + +class WindowAlias(Node): + window: Window + def __init__(self, window: Window): ... + def alias(self: Self, window_alias: str) -> Self: ... + def __sql__(self, ctx: Context) -> Context: ... + +class ForUpdate(Node): + def __init__( + self, + expr: Literal[True] | str, + of: _TModelOrTable | list[_TModelOrTable] | set[_TModelOrTable] | tuple[_TModelOrTable, ...] | None = ..., + nowait: bool | None = ..., + ): ... + def __sql__(self, ctx: Context) -> Context: ... + +def Case(predicate: Node | None, expression_tuples: Iterable[tuple[Expression, Any]], default: object = ...) -> NodeList: ... + +class NodeList(ColumnBase): + # TODO (dargueta): Narrow this type + nodes: Sequence[Incomplete] + glue: str + parens: bool + def __init__(self, nodes: Sequence[Incomplete], glue: str = ..., parens: bool = ...): ... + def __sql__(self, ctx: Context) -> Context: ... + +def CommaNodeList(nodes: Sequence[Incomplete]) -> NodeList: ... +def EnclosedNodeList(nodes: Sequence[Incomplete]) -> NodeList: ... + +class _Namespace(Node): + def __init__(self, name: str): ... + def __getattr__(self, attr: str) -> NamespaceAttribute: ... + def __getitem__(self, attr: str) -> NamespaceAttribute: ... + +class NamespaceAttribute(ColumnBase): + def __init__(self, namespace: _Namespace, attribute: str): ... + def __sql__(self, ctx: Context) -> Context: ... + +EXCLUDED: _Namespace + +class DQ(ColumnBase): + query: dict[str, Any] + + # TODO (dargueta): Narrow this down? + def __init__(self, **query: object): ... + def __invert__(self: Self) -> Self: ... + def clone(self: Self) -> Self: ... + +class QualifiedNames(WrappedNode[Incomplete]): + def __sql__(self, ctx: Context) -> Context: ... + +@overload +def qualify_names(node: Expression) -> Expression: ... +@overload +def qualify_names(node: ColumnBase) -> QualifiedNames: ... +@overload +def qualify_names(node: _T) -> _T: ... + +class OnConflict(Node): + @overload + def __init__( + self, + action: str | None = ..., + update: Mapping[str, object] | None = ..., + preserve: Field | Iterable[Field] | None = ..., + where: Expression | None = ..., + conflict_target: Field | Sequence[Field] | None = ..., + conflict_where: None = ..., + conflict_constraint: str | None = ..., + ): ... + @overload + def __init__( + self, + action: str | None = ..., + update: Mapping[str, object] | None = ..., + preserve: Field | Iterable[Field] | None = ..., + where: Expression | None = ..., + conflict_target: None = ..., + conflict_where: Expression | None = ..., + conflict_constraint: str | None = ..., + ): ... + # undocumented + def get_conflict_statement(self, ctx: Context, query: Query) -> SQL | None: ... + def get_conflict_update(self, ctx: Context, query: Query) -> NodeList: ... + def preserve(self: Self, *columns: Column) -> Self: ... + # Despite the argument name `_data` is documented + def update(self: Self, _data: Mapping[str, object] | None = ..., **kwargs: object) -> Self: ... + def where(self: Self, *expressions: Expression) -> Self: ... + def conflict_target(self: Self, *constraints: Column) -> Self: ... + def conflict_where(self: Self, *expressions: Expression) -> Self: ... + def conflict_constraint(self: Self, constraint: str) -> Self: ... + +class BaseQuery(Node): + default_row_type: ClassVar[int] + def __init__(self, _database: Database | None = ..., **kwargs: object): ... + def bind(self: Self, database: Database | None = ...) -> Self: ... + def clone(self: Self) -> Self: ... + def dicts(self: Self, as_dict: bool = ...) -> Self: ... + def tuples(self: Self, as_tuple: bool = ...) -> Self: ... + def namedtuples(self: Self, as_namedtuple: bool = ...) -> Self: ... + def objects(self: Self, constructor: _ConvFunc | None = ...) -> Self: ... + def __sql__(self, ctx: Context) -> Context: ... + def sql(self) -> tuple[str, tuple[Incomplete, ...] | None]: ... + def execute(self, database: Database | None = ...) -> CursorWrapper[Incomplete]: ... + def iterator(self, database: Database | None = ...) -> Iterator[Incomplete]: ... + def __iter__(self) -> Iterator[Any]: ... + @overload + def __getitem__(self, value: int) -> Any: ... + @overload + def __getitem__(self, value: slice) -> Sequence[Any]: ... + def __len__(self) -> int: ... + +class RawQuery(BaseQuery): + # TODO (dargueta): `tuple` may not be 100% accurate, maybe Sequence[object]? + def __init__(self, sql: str | None = ..., params: tuple[Incomplete, ...] | None = ..., **kwargs: object): ... + def __sql__(self, ctx: Context) -> Context: ... + +class Query(BaseQuery): + # TODO (dargueta): Verify type of order_by + def __init__( + self, + where: Expression | None = ..., + order_by: Sequence[Node] | None = ..., + limit: int | None = ..., + offset: int | None = ..., + **kwargs: object, + ): ... + def with_cte(self: Self, *cte_list: CTE) -> Self: ... + def where(self: Self, *expressions: Expression) -> Self: ... + def orwhere(self: Self, *expressions: Expression) -> Self: ... + def order_by(self: Self, *values: Node) -> Self: ... + def order_by_extend(self: Self, *values: Node) -> Self: ... + def limit(self: Self, value: int | None = ...) -> Self: ... + def offset(self: Self, value: int | None = ...) -> Self: ... + def paginate(self: Self, page: int, paginate_by: int = ...) -> Self: ... + def _apply_ordering(self, ctx: Context) -> Context: ... + def __sql__(self, ctx: Context) -> Context: ... + +def __compound_select__(operation: str, inverted: bool = ...) -> Callable[[Any, Any], CompoundSelectQuery]: ... + +class SelectQuery(Query): + def union_all(self, other: object) -> CompoundSelectQuery: ... + def union(self, other: object) -> CompoundSelectQuery: ... + def intersect(self, other: object) -> CompoundSelectQuery: ... + def except_(self, other: object) -> CompoundSelectQuery: ... + def __add__(self, other: object) -> CompoundSelectQuery: ... + def __or__(self, other: object) -> CompoundSelectQuery: ... + def __and__(self, other: object) -> CompoundSelectQuery: ... + def __sub__(self, other: object) -> CompoundSelectQuery: ... + def __radd__(self, other: object) -> CompoundSelectQuery: ... + def __ror__(self, other: object) -> CompoundSelectQuery: ... + def __rand__(self, other: object) -> CompoundSelectQuery: ... + def __rsub__(self, other: object) -> CompoundSelectQuery: ... + def select_from(self, *columns: Field) -> Select: ... + +class SelectBase(_HashableSource, Source, SelectQuery): + @overload + def peek(self, database: Database | None = ..., n: Literal[1] = ...) -> object: ... + @overload + def peek(self, database: Database | None = ..., n: int = ...) -> list[object]: ... + @overload + def first(self, database: Database | None = ..., n: Literal[1] = ...) -> object: ... + @overload + def first(self, database: Database | None = ..., n: int = ...) -> list[object]: ... + @overload + def scalar(self, database: Database | None = ..., as_tuple: Literal[False] = ...) -> object: ... + @overload + def scalar(self, database: Database | None = ..., as_tuple: Literal[True] = ...) -> tuple[Incomplete, ...]: ... + def count(self, database: Database | None = ..., clear_limit: bool = ...) -> int: ... + def exists(self, database: Database | None = ...) -> bool: ... + def get(self, database: Database | None = ...) -> object: ... + +# QUERY IMPLEMENTATIONS. + +class CompoundSelectQuery(SelectBase): + lhs: Any # TODO (dargueta) + op: str + rhs: Any # TODO (dargueta) + def __init__(self, lhs: object, op: str, rhs: object): ... + def exists(self, database: Database | None = ...) -> bool: ... + def __sql__(self, ctx: Context) -> Context: ... + +class Select(SelectBase): + def __init__( + self, + from_list: Sequence[Column | Field] | None = ..., # TODO (dargueta): `Field` might be wrong + columns: Iterable[Column | Field] | None = ..., # TODO (dargueta): `Field` might be wrong + # Docs say this is a "[l]ist of columns or values to group by" so we don't have + # a whole lot to restrict this to thanks to "or values" + group_by: Sequence[object] = ..., + having: Expression | None = ..., + distinct: bool | Sequence[Column] | None = ..., + windows: Container[Window] | None = ..., + for_update: bool | str | None = ..., + for_update_of: Table | Iterable[Table] | None = ..., + nowait: bool | None = ..., + lateral: bool | None = ..., # undocumented + **kwargs: object, + ): ... + def clone(self: Self) -> Self: ... + # TODO (dargueta) `Field` might be wrong in this union + def columns(self: Self, *columns: Column | Field, **kwargs: object) -> Self: ... + select = columns # type: ignore + def select_extend(self: Self, *columns: Incomplete) -> Self: ... + # TODO (dargueta): Is `sources` right? + def from_(self: Self, *sources: Source | type[Model]) -> Self: ... + def join(self: Self, dest: type[Model], join_type: int = ..., on: Expression | None = ...) -> Self: ... + def group_by(self: Self, *columns: Table | Field) -> Self: ... + def group_by_extend(self: Self, *values: Table | Field) -> Self: ... + def having(self: Self, *expressions: Expression) -> Self: ... + @overload + def distinct(self: Self, _: bool) -> Self: ... + @overload + def distinct(self: Self, *columns: Field) -> Self: ... + def window(self: Self, *windows: Window) -> Self: ... + def for_update( + self: Self, for_update: bool = ..., of: Table | Iterable[Table] | None = ..., nowait: bool | None = ... + ) -> Self: ... + def lateral(self: Self, lateral: bool = ...) -> Self: ... + +class _WriteQuery(Query): + table: Table + def __init__(self, table: Table, returning: Iterable[type[Model] | Field] | None = ..., **kwargs: object): ... + def returning(self: Self, *returning: type[Model] | Field) -> Self: ... + def apply_returning(self, ctx: Context) -> Context: ... + def execute_returning(self, database: Database) -> CursorWrapper[Incomplete]: ... + def handle_result(self, database: Database, cursor: __ICursor) -> int | __ICursor: ... + def __sql__(self, ctx: Context) -> Context: ... + +class Update(_WriteQuery): + # TODO (dargueta): `update` + def __init__(self, table: Table, update: Any | None = ..., **kwargs: object): ... + def from_(self: Self, *sources) -> Self: ... + def __sql__(self, ctx: Context) -> Context: ... + +class Insert(_WriteQuery): + SIMPLE: ClassVar[int] + QUERY: ClassVar[int] + MULTI: ClassVar[int] + DefaultValuesException: type[Exception] + def __init__( + self, + table: Table, + insert: Mapping[str, object] | Iterable[Mapping[str, object]] | SelectQuery | SQL | None = ..., + columns: Iterable[str | Field] | None = ..., # FIXME: Might be `Column` not `Field` + on_conflict: OnConflict | None = ..., + **kwargs: object, + ): ... + def where(self, *expressions: Expression) -> NoReturn: ... + def on_conflict_ignore(self: Self, ignore: bool = ...) -> Self: ... + def on_conflict_replace(self: Self, replace: bool = ...) -> Self: ... + def on_conflict(self: Self, *args, **kwargs) -> Self: ... + def get_default_data(self) -> Mapping[str, object]: ... + def get_default_columns(self) -> list[Field] | None: ... + def __sql__(self, ctx: Context) -> Context: ... + def handle_result(self, database: Database, cursor: __ICursor) -> __ICursor | int: ... + +class Delete(_WriteQuery): + def __sql__(self, ctx: Context) -> Context: ... + +class Index(Node): + def __init__( + self, + name: str, + table, + expressions, + unique: bool = ..., + safe: bool = ..., + where: Expression | None = ..., + using: str | None = ..., + ): ... + def safe(self: Self, _safe: bool = ...) -> Self: ... + def where(self: Self, *expressions: Expression) -> Self: ... + def using(self: Self, _using: str | None = ...) -> Self: ... + def __sql__(self, ctx: Context) -> Context: ... + +class ModelIndex(Index): + def __init__( + self, + model: type[_ModelT], + fields: Iterable[Field | Node | str], + unique: bool = ..., + safe: bool = ..., + where: Expression | None = ..., + using: str | None = ..., + name: str | None = ..., + ): ... + +class PeeweeException(Exception): + # This attribute only exists if an exception was passed into the constructor. + # Attempting to access it otherwise will result in an AttributeError. + orig: Exception + def __init__(self, *args: object): ... + +class ImproperlyConfigured(PeeweeException): ... +class DatabaseError(PeeweeException): ... +class DataError(DatabaseError): ... +class IntegrityError(DatabaseError): ... +class InterfaceError(PeeweeException): ... +class InternalError(DatabaseError): ... +class NotSupportedError(DatabaseError): ... +class OperationalError(DatabaseError): ... +class ProgrammingError(DatabaseError): ... + +class ExceptionWrapper: + exceptions: Mapping[str, type[Exception]] + def __init__(self, exceptions: Mapping[str, type[Exception]]): ... + def __enter__(self) -> None: ... + def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: object) -> None: ... + +EXCEPTIONS: Mapping[str, type[Exception]] + +class IndexMetadata(NamedTuple): + name: str + sql: str + columns: list[str] + unique: bool + table: str + +class ColumnMetadata(NamedTuple): + name: str + data_type: str + null: bool + primary_key: bool + table: str + default: object + +class ForeignKeyMetadata(NamedTuple): + column: str + dest_table: str + dest_column: str + table: str + +class ViewMetadata(NamedTuple): + name: str + sql: str + +class _ConnectionState: + closed: bool + conn: __IConnection | None + ctx: list[ConnectionContext] + transactions: list[_manual | _transaction] + def reset(self) -> None: ... + def set_connection(self, conn: __IConnection) -> None: ... + +class _ConnectionLocal(_ConnectionState, threading.local): ... + +class ConnectionContext(_callable_context_manager): + db: Database + def __enter__(self) -> None: ... + def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None: ... + +class Database(_callable_context_manager): + context_class: ClassVar[type[Incomplete]] + field_types: ClassVar[Mapping[str, str]] + operations: ClassVar[Mapping[str, Any]] # TODO (dargueta) Verify k/v types + param: ClassVar[str] + quote: ClassVar[str] + server_version: ClassVar[int | tuple[int, ...] | None] + commit_select: ClassVar[bool] + compound_select_parentheses: ClassVar[int] + for_update: ClassVar[bool] + index_schema_prefix: ClassVar[bool] + index_using_precedes_table: ClassVar[bool] + limit_max: ClassVar[int | None] + nulls_ordering: ClassVar[bool] + returning_clause: ClassVar[bool] + safe_create_index: ClassVar[bool] + safe_drop_index: ClassVar[bool] + sequences: ClassVar[bool] + truncate_table: ClassVar[bool] + # Instance variables + database: __IConnection + deferred: bool + autoconnect: bool + autorollback: bool + thread_safe: bool + connect_params: Mapping[str, Any] + def __init__( + self, + database: __IConnection, + thread_safe: bool = ..., + autorollback: bool = ..., + field_types: Mapping[str, str] | None = ..., + operations: Mapping[str, str] | None = ..., + autocommit: bool = ..., + autoconnect: bool = ..., + **kwargs: object, + ): ... + def init(self, database: __IConnection, **kwargs: Any) -> None: ... + def __enter__(self: Self) -> Self: ... + def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None: ... + def connection_context(self) -> ConnectionContext: ... + def connect(self, reuse_if_open: bool = ...) -> bool: ... + def close(self) -> bool: ... + def is_closed(self) -> bool: ... + def is_connection_usable(self) -> bool: ... + def connection(self) -> __IConnection: ... + def cursor(self, commit: bool | None = ...) -> __ICursor: ... + def execute_sql( + self, sql: str, params: tuple[Incomplete, ...] | None = ..., commit: bool | _TSentinel = ... + ) -> __ICursor: ... + def execute(self, query: Query, commit: bool | _TSentinel = ..., **context_options: object) -> __ICursor: ... + def get_context_options(self) -> Mapping[str, object]: ... + def get_sql_context(self, **context_options: object): ... + def conflict_statement(self, on_conflict: OnConflict, query: Query) -> SQL | None: ... + def conflict_update(self, oc: OnConflict, query: Query) -> NodeList: ... + def last_insert_id(self, cursor: __ICursor, query_type: int | None = ...) -> int: ... + def rows_affected(self, cursor: __ICursor) -> int: ... + def default_values_insert(self, ctx: Context) -> Context: ... + def session_start(self) -> _transaction: ... + def session_commit(self) -> bool: ... + def session_rollback(self) -> bool: ... + def in_transaction(self) -> bool: ... + def push_transaction(self, transaction) -> None: ... + def pop_transaction(self) -> _manual | _transaction: ... + def transaction_depth(self) -> int: ... + def top_transaction(self) -> _manual | _transaction | None: ... + def atomic(self, *args: object, **kwargs: object) -> _atomic: ... + def manual_commit(self) -> _manual: ... + def transaction(self, *args: object, **kwargs: object) -> _transaction: ... + def savepoint(self) -> _savepoint: ... + def begin(self) -> None: ... + def commit(self) -> None: ... + def rollback(self) -> None: ... + def batch_commit(self, it: Iterable[_T], n: int) -> Iterator[_T]: ... + def table_exists(self, table_name: str, schema: str | None = ...) -> str: ... + def get_tables(self, schema: str | None = ...) -> list[str]: ... + def get_indexes(self, table: str, schema: str | None = ...) -> list[IndexMetadata]: ... + def get_columns(self, table: str, schema: str | None = ...) -> list[ColumnMetadata]: ... + def get_primary_keys(self, table: str, schema: str | None = ...) -> list[str]: ... + def get_foreign_keys(self, table: str, schema: str | None = ...) -> list[ForeignKeyMetadata]: ... + def sequence_exists(self, seq: str) -> bool: ... + def create_tables(self, models: Iterable[type[Model]], **options: object) -> None: ... + def drop_tables(self, models: Iterable[type[Model]], **kwargs: object) -> None: ... + def extract_date(self, date_part: str, date_field: Node) -> Function: ... + def truncate_date(self, date_part: str, date_field: Node) -> Function: ... + def to_timestamp(self, date_field: str) -> Function: ... + def from_timestamp(self, date_field: str) -> Function: ... + def random(self) -> Node: ... + def bind(self, models: Iterable[type[Model]], bind_refs: bool = ..., bind_backrefs: bool = ...) -> None: ... + def bind_ctx( + self, models: Iterable[type[Model]], bind_refs: bool = ..., bind_backrefs: bool = ... + ) -> _BoundModelsContext: ... + def get_noop_select(self, ctx: Context) -> Context: ... + +class SqliteDatabase(Database): + field_types: ClassVar[Mapping[str, int]] + operations: ClassVar[Mapping[str, str]] + index_schema_prefix: ClassVar[bool] + limit_max: ClassVar[int] + server_version: ClassVar[tuple[int, ...]] + truncate_table: ClassVar[bool] + # Instance variables + timeout: int + nulls_ordering: bool + # Properties + cache_size: int + def __init__( + self, database: str, *args: object, pragmas: Mapping[str, object] | Iterable[tuple[str, Any]] = ..., **kwargs: object + ): ... + def init( + self, + database: str, + pragmas: Mapping[str, object] | Iterable[tuple[str, Any]] | None = ..., + timeout: int = ..., + **kwargs: object, + ) -> None: ... + def pragma(self, key: str, value: str | bool | int = ..., permanent: bool = ..., schema: str | None = ...) -> object: ... + @property + def foreign_keys(self) -> Any: ... + @foreign_keys.setter + def foreign_keys(self, value: object) -> Any: ... + @property + def journal_mode(self) -> Any: ... + @journal_mode.setter + def journal_mode(self, value: object) -> Any: ... + @property + def journal_size_limit(self) -> Any: ... + @journal_size_limit.setter + def journal_size_limit(self, value: object) -> Any: ... + @property + def mmap_size(self) -> Any: ... + @mmap_size.setter + def mmap_size(self, value: object) -> Any: ... + @property + def page_size(self) -> Any: ... + @page_size.setter + def page_size(self, value: object) -> Any: ... + @property + def read_uncommitted(self) -> Any: ... + @read_uncommitted.setter + def read_uncommitted(self, value: object) -> Any: ... + @property + def synchronous(self) -> Any: ... + @synchronous.setter + def synchronous(self, value: object) -> Any: ... + @property + def wal_autocheckpoint(self) -> Any: ... + @wal_autocheckpoint.setter + def wal_autocheckpoint(self, value: object) -> Any: ... + def register_aggregate(self, klass: type[__IAggregate], name: str | None = ..., num_params: int = ...): ... + def aggregate(self, name: str | None = ..., num_params: int = ...) -> Callable[[_ClassT], _ClassT]: ... + def register_collation(self, fn: Callable[..., Incomplete], name: str | None = ...) -> None: ... + def collation(self, name: str | None = ...) -> Callable[[_FuncT], _FuncT]: ... + def register_function(self, fn: Callable[..., Incomplete], name: str | None = ..., num_params: int = ...) -> int: ... + def func(self, name: str | None = ..., num_params: int = ...) -> Callable[[_FuncT], _FuncT]: ... + def register_window_function(self, klass: type, name: str | None = ..., num_params: int = ...) -> None: ... + def window_function(self, name: str | None = ..., num_params: int = ...) -> Callable[[_ClassT], _ClassT]: ... + def register_table_function(self, klass: type[__ITableFunction], name: str | None = ...) -> None: ... + def table_function(self, name: str | None = ...) -> Callable[[type[__ITableFunction]], type[__ITableFunction]]: ... + def unregister_aggregate(self, name: str) -> None: ... + def unregister_collation(self, name: str) -> None: ... + def unregister_function(self, name: str) -> None: ... + def unregister_window_function(self, name: str) -> None: ... + def unregister_table_function(self, name: str) -> bool: ... + def load_extension(self, extension: str) -> None: ... + def unload_extension(self, extension: str) -> None: ... + def attach(self, filename: str, name: str) -> bool: ... + def detach(self, name: str) -> bool: ... + def begin(self, lock_type: str | None = ...) -> None: ... + def get_views(self, schema: str | None = ...) -> list[ViewMetadata]: ... + def get_binary_type(self) -> type: ... + +class PostgresqlDatabase(Database): + field_types: ClassVar[Mapping[str, str]] + operations: ClassVar[Mapping[str, str]] + param: ClassVar[str] + commit_select: ClassVar[bool] + compound_select_parentheses: ClassVar[int] + for_update: ClassVar[bool] + nulls_ordering: ClassVar[bool] + returning_clause: ClassVar[bool] + safe_create_index: ClassVar[bool] + sequences: ClassVar[bool] + # Instance variables + server_version: int + def init( # type: ignore[override] + self, + database: __IConnection, + register_unicode: bool = ..., + encoding: str | None = ..., + isolation_level: int | None = ..., + **kwargs: object, + ): ... + def is_connection_usable(self) -> bool: ... + def last_insert_id(self, cursor: __ICursor, query_type: int | None = ...) -> int | None | __ICursor: ... + def get_views(self, schema: str | None = ...) -> list[ViewMetadata]: ... + def get_binary_type(self) -> type: ... + def get_noop_select(self, ctx: Context) -> SelectQuery: ... + def set_time_zone(self, timezone: str) -> None: ... + +class MySQLDatabase(Database): + field_types: ClassVar[Mapping[str, str]] + operations: ClassVar[Mapping[str, str]] + param: ClassVar[str] + quote: ClassVar[str] + commit_select: ClassVar[bool] + compound_select_parentheses: ClassVar[int] + for_update: ClassVar[bool] + index_using_precedes_table: ClassVar[bool] + limit_max: ClassVar[int] + safe_create_index: ClassVar[bool] + safe_drop_index: ClassVar[bool] + sql_mode: ClassVar[str] + # Instance variables + server_version: tuple[int, ...] + def init(self, database: __IConnection, **kwargs: object): ... + def default_values_insert(self, ctx: Context) -> SQL: ... + def get_views(self, schema: str | None = ...) -> list[ViewMetadata]: ... + def get_binary_type(self) -> type: ... + # TODO (dargueta) Verify return type on these function calls + def extract_date(self, date_part: str, date_field: str) -> Function: ... + def truncate_date(self, date_part: str, date_field: str) -> Function: ... + def to_timestamp(self, date_field: str) -> Function: ... + def from_timestamp(self, date_field: str) -> Function: ... + def random(self) -> Function: ... + def get_noop_select(self, ctx: Context) -> Context: ... + +# TRANSACTION CONTROL. + +class _manual(_callable_context_manager): + db: Database + def __init__(self, db: Database): ... + def __enter__(self) -> None: ... + def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None: ... + +class _atomic(_callable_context_manager): + db: Database + def __init__(self, db: Database, *args: object, **kwargs: object): ... + def __enter__(self) -> _transaction | _savepoint: ... + def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None: ... + +class _transaction(_callable_context_manager): + db: Database + def __init__(self, db: Database, *args: object, **kwargs: object): ... + def commit(self, begin: bool = ...) -> None: ... + def rollback(self, begin: bool = ...) -> None: ... + def __enter__(self: Self) -> Self: ... + def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None: ... + +class _savepoint(_callable_context_manager): + db: Database + sid: str + quoted_sid: str + def __init__(self, db: Database, sid: str | None = ...): ... + def commit(self, begin: bool = ...) -> None: ... + def rollback(self) -> None: ... + def __enter__(self: Self) -> Self: ... + def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None: ... + +class CursorWrapper(Generic[_T]): + cursor: __ICursor + count: int + index: int + initialized: bool + populated: bool + row_cache: list[_T] + def __init__(self, cursor: __ICursor): ... + def __iter__(self) -> ResultIterator[_T] | Iterator[_T]: ... + @overload + def __getitem__(self, item: int) -> _T: ... + @overload + def __getitem__(self, item: slice) -> list[_T]: ... + def __len__(self) -> int: ... + def initialize(self) -> None: ... + def iterate(self, cache: bool = ...) -> _T: ... + def process_row(self, row: tuple[Incomplete, ...]) -> _T: ... + def iterator(self) -> Iterator[_T]: ... + def fill_cache(self, n: int = ...) -> None: ... + +class DictCursorWrapper(CursorWrapper[Mapping[str, object]]): ... + +# FIXME (dargueta): Somehow figure out how to make this a NamedTuple sorta deal +class NamedTupleCursorWrapper(CursorWrapper[tuple[Incomplete, ...]]): + tuple_class: type[tuple[Incomplete, ...]] + +class ObjectCursorWrapper(DictCursorWrapper, Generic[_T]): + constructor: Callable[..., _T] + def __init__(self, cursor: __ICursor, constructor: Callable[..., _T]) -> None: ... + def process_row(self, row: tuple) -> _T: ... # type: ignore + +class ResultIterator(Generic[_T]): + cursor_wrapper: CursorWrapper[_T] + index: int + def __init__(self, cursor_wrapper: CursorWrapper[_T]): ... + def __iter__(self) -> Iterator[_T]: ... + +# FIELDS + +class FieldAccessor: + model: type[Model] + field: Field + name: str + def __init__(self, model: type[Model], field: Field, name: str): ... + @overload + def __get__(self, instance: None, instance_type: type) -> Field: ... + @overload + def __get__(self, instance: _T, instance_type: type[_T]) -> Any: ... + +class ForeignKeyAccessor(FieldAccessor): + model: type[Model] + field: ForeignKeyField + name: str + rel_model: type[Model] + def __init__(self, model: type[Model], field: ForeignKeyField, name: str): ... + def get_rel_instance(self, instance: Model) -> Any: ... + @overload + def __get__(self, instance: None, instance_type: type) -> Any: ... + @overload + def __get__(self, instance: _ModelT, instance_type: type[_ModelT]) -> ForeignKeyField: ... + def __set__(self, instance: Model, obj: object) -> None: ... + +class NoQueryForeignKeyAccessor(ForeignKeyAccessor): + def get_rel_instance(self, instance: Model) -> Any: ... + +class BackrefAccessor: + field: ForeignKeyField + model: type[Model] + rel_model: type[Model] + def __init__(self, field: ForeignKeyField): ... + @overload + def __get__(self, instance: None, instance_type: type) -> BackrefAccessor: ... + @overload + def __get__(self, instance: Field, instance_type: type[Field]) -> SelectQuery: ... + +# "Gives direct access to the underlying id" +class ObjectIdAccessor: + field: ForeignKeyField + def __init__(self, field: ForeignKeyField): ... + @overload + def __get__(self, instance: None, instance_type: type[Model]) -> ForeignKeyField: ... + @overload + def __get__(self, instance: _ModelT, instance_type: type[_ModelT] = ...) -> Any: ... + def __set__(self, instance: Model, value: object) -> None: ... + +class Field(ColumnBase): + accessor_class: ClassVar[type[FieldAccessor]] + auto_increment: ClassVar[bool] + default_index_type: ClassVar[str | None] + field_type: ClassVar[str] + unpack: ClassVar[bool] + # Instance variables + model: type[Model] + null: bool + index: bool + unique: bool + column_name: str + default: Any + primary_key: bool + constraints: Iterable[Callable[[str], SQL] | SQL] | None + sequence: str | None + collation: str | None + unindexed: bool + help_text: str | None + verbose_name: str | None + index_type: str | None + def __init__( + self, + null: bool = ..., + index: bool = ..., + unique: bool = ..., + column_name: str = ..., + default: Any = ..., + primary_key: bool = ..., + constraints: Iterable[Callable[[str], SQL] | SQL] | None = ..., + sequence: str | None = ..., + collation: str | None = ..., + unindexed: bool | None = ..., + choices: Iterable[tuple[Any, str]] | None = ..., + help_text: str | None = ..., + verbose_name: str | None = ..., + index_type: str | None = ..., + db_column: str | None = ..., # Deprecated argument, undocumented + _hidden: bool = ..., + ): ... + def __hash__(self) -> int: ... + def bind(self, model: type[Model], name: str, set_attribute: bool = ...) -> None: ... + @property + def column(self) -> Column: ... + def adapt(self, value: _T) -> _T: ... + def db_value(self, value: _T) -> _T: ... + def python_value(self, value: _T) -> _T: ... + def to_value(self, value: Any) -> Value: ... + def get_sort_key(self, ctx: Context) -> tuple[int, int]: ... + def __sql__(self, ctx: Context) -> Context: ... + def get_modifiers(self) -> Any: ... + def ddl_datatype(self, ctx: Context) -> SQL: ... + def ddl(self, ctx: Context) -> NodeList: ... + +class IntegerField(Field): + @overload + def adapt(self, value: str | float | bool) -> int: ... # type: ignore + @overload + def adapt(self, value: _T) -> _T: ... + +class BigIntegerField(IntegerField): ... +class SmallIntegerField(IntegerField): ... + +class AutoField(IntegerField): + def __init__(self, *args: object, primary_key: bool = ..., **kwargs: object): ... + +class BigAutoField(AutoField): ... + +class IdentityField(AutoField): + def __init__(self, generate_always: bool = ..., **kwargs: object): ... + +class PrimaryKeyField(AutoField): ... + +class FloatField(Field): + @overload + def adapt(self, value: str | float | bool) -> float: ... # type: ignore + @overload + def adapt(self, value: _T) -> _T: ... + +class DoubleField(FloatField): ... + +class DecimalField(Field): + max_digits: int + decimal_places: int + auto_round: int + rounding: bool + def __init__( + self, + max_digits: int = ..., + decimal_places: int = ..., + auto_round: bool = ..., + rounding: bool = ..., + *args: object, + **kwargs: object, + ): ... + def get_modifiers(self) -> list[int]: ... + @overload + def db_value(self, value: None) -> None: ... + @overload + def db_value(self, value: float | decimal.Decimal) -> decimal.Decimal: ... # type: ignore + @overload + def db_value(self, value: _T) -> _T: ... + @overload + def python_value(self, value: None) -> None: ... + @overload + def python_value(self, value: str | float | decimal.Decimal) -> decimal.Decimal: ... + +class _StringField(Field): + def adapt(self, value: AnyStr) -> str: ... + def __add__(self, other: Any) -> StringExpression: ... + def __radd__(self, other: Any) -> StringExpression: ... + +class CharField(_StringField): + max_length: int + def __init__(self, max_length: int = ..., *args: object, **kwargs: object): ... + def get_modifiers(self) -> list[int] | None: ... + +class FixedCharField(CharField): ... +class TextField(_StringField): ... + +class BlobField(Field): + @overload + def db_value(self, value: str | bytes) -> bytearray: ... + @overload + def db_value(self, value: _T) -> _T: ... + +class BitField(BitwiseMixin, BigIntegerField): + def __init__(self, *args: object, default: int | None = ..., **kwargs: object): ... + # FIXME (dargueta) Return type isn't 100% accurate; function creates a new class + def flag(self, value: int | None = ...) -> ColumnBase: ... + +class BigBitFieldData: + name: str + instance: Model + def __init__(self, instance: Model, name: str): ... + def set_bit(self, idx: int) -> None: ... + def clear_bit(self, idx: bool) -> None: ... + def toggle_bit(self, idx: int) -> bool: ... + def is_set(self, idx: int) -> bool: ... + +class BigBitFieldAccessor(FieldAccessor): + @overload + def __get__(self, instance: None, instance_type: type[_ModelT]) -> Field: ... + @overload + def __get__(self, instance: _ModelT, instance_type: type[_ModelT]) -> BigBitFieldData: ... + def __set__(self, instance: Any, value: memoryview | bytearray | BigBitFieldData | str | bytes) -> None: ... + +class BigBitField(BlobField): + accessor_class: ClassVar[type[BigBitFieldAccessor]] + def __init__(self, *args: object, default: type = ..., **kwargs: object): ... + @overload + def db_value(self, value: None) -> None: ... + @overload + def db_value(self, value) -> bytes: ... + +class UUIDField(Field): + @overload + def db_value(self, value: AnyStr) -> str: ... + @overload + def db_value(self, value: _T) -> _T: ... + @overload + def python_value(self, value: uuid.UUID | AnyStr) -> uuid.UUID: ... + @overload + def python_value(self, value: None) -> None: ... + +class BinaryUUIDField(BlobField): + @overload + def db_value(self, value: None) -> None: ... + @overload + def db_value(self, value: bytearray | bytes | str | uuid.UUID | None) -> bytes: ... + @overload + def python_value(self, value: None) -> None: ... + @overload + def python_value(self, value: bytearray | bytes | memoryview | uuid.UUID) -> uuid.UUID: ... + +def format_date_time(value: str, formats: Iterable[str], post_process: _ConvFunc | None = ...) -> str: ... +@overload +def simple_date_time(value: str) -> datetime.datetime: ... +@overload +def simple_date_time(value: _T) -> _T: ... + +class _BaseFormattedField(Field): + # TODO (dargueta): This is a class variable that can be overridden for instances + formats: Container[str] | None + def __init__(self, formats: Container[str] | None = ..., *args: object, **kwargs: object): ... + +class DateTimeField(_BaseFormattedField): + @property + def year(self) -> int: ... + @property + def month(self) -> int: ... + @property + def day(self) -> int: ... + @property + def hour(self) -> int: ... + @property + def minute(self) -> int: ... + @property + def second(self) -> int: ... + def adapt(self, value: _T) -> _T: ... + def to_timestamp(self) -> Function: ... + def truncate(self, part: str) -> Function: ... + +class DateField(_BaseFormattedField): + @property + def year(self) -> int: ... + @property + def month(self) -> int: ... + @property + def day(self) -> int: ... + @overload + def adapt(self, value: datetime.datetime) -> datetime.date: ... + @overload + def adapt(self, value: _T) -> _T: ... + def to_timestamp(self) -> Function: ... + def truncate(self, part: str) -> Function: ... + +class TimeField(_BaseFormattedField): + @overload + def adapt(self, value: datetime.datetime | datetime.timedelta) -> datetime.time: ... + @overload + def adapt(self, value: _T) -> _T: ... + @property + def hour(self) -> int: ... + @property + def minute(self) -> int: ... + @property + def second(self) -> int: ... + +class TimestampField(BigIntegerField): + valid_resolutions: ClassVar[Container[int]] + # Instance variables + resolution: int + ticks_to_microsecond: int + utc: bool + def __init__(self, *args: object, resolution: int = ..., utc: bool = ..., **kwargs: object): ... + def local_to_utc(self, dt: datetime.datetime) -> datetime.datetime: ... + def utc_to_local(self, dt: datetime.datetime) -> datetime.datetime: ... + def get_timestamp(self, value: datetime.datetime) -> float: ... + @overload + def db_value(self, value: None) -> None: ... + @overload + def db_value(self, value: datetime.datetime | datetime.date | float) -> int: ... + @overload + def python_value(self, value: int | float) -> datetime.datetime: ... + @overload + def python_value(self, value: _T) -> _T: ... + def from_timestamp(self) -> float: ... + @property + def year(self) -> int: ... + @property + def month(self) -> int: ... + @property + def day(self) -> int: ... + @property + def hour(self) -> int: ... + @property + def minute(self) -> int: ... + @property + def second(self) -> float: ... # TODO (dargueta) Int? + +class IPField(BigIntegerField): + @overload + def db_value(self, val: str) -> int: ... + @overload + def db_value(self, val: None) -> None: ... + @overload + def python_value(self, val: int) -> str: ... + @overload + def python_value(self, val: None) -> None: ... + +class BooleanField(Field): + def adapt(self, value: Any) -> bool: ... + +class BareField(Field): + # If `adapt` was omitted from the constructor or None, this attribute won't exist. + adapt: _ConvFunc | None + def __init__(self, adapt: _ConvFunc | None = ..., *args: object, **kwargs: object): ... + def ddl_datatype(self, ctx: Context) -> None: ... + +class ForeignKeyField(Field): + accessor_class: ClassVar[type[ForeignKeyAccessor]] + rel_model: type[Model] | Literal["self"] + rel_field: Field + declared_backref: str | None + backref: str | None # TODO (dargueta): Verify + on_delete: str | None + on_update: str | None + deferrable: str | None + deferred: bool | None + object_id_name: str | None + lazy_load: bool + safe_name: str + def __init__( + self, + model: type[Model] | Literal["self"], + field: Field | None = ..., + # TODO (dargueta): Documentation says this is only a string but code accepts a callable too + backref: str | None = ..., + on_delete: str | None = ..., + on_update: str | None = ..., + deferrable: str | None = ..., + _deferred: bool | None = ..., # undocumented + rel_model: object = ..., # undocumented + to_field: object = ..., # undocumented + object_id_name: str | None = ..., + lazy_load: bool = ..., + # type for related_name is a guess + related_name: str | None = ..., # undocumented + *args: object, + index: bool = ..., + **kwargs: object, + ): ... + @property + def field_type(self) -> str: ... + def get_modifiers(self) -> Iterable[object] | None: ... + def adapt(self, value: object) -> Any: ... + def db_value(self, value: object) -> Any: ... + def python_value(self, value: object) -> Any: ... + def bind(self, model: type[Model], name: str, set_attribute: bool = ...) -> None: ... + def foreign_key_constraint(self) -> NodeList: ... + def __getattr__(self, attr: str) -> Field: ... + +class DeferredForeignKey(Field): + field_kwargs: dict[str, object] + rel_model_name: str + def __init__(self, rel_model_name: str, *, column_name: str | None = ..., null: str | None = ..., **kwargs: object): ... + def set_model(self, rel_model: type[Model]) -> None: ... + @staticmethod + def resolve(model_cls: type[Model]) -> None: ... + def __hash__(self) -> int: ... + +class DeferredThroughModel: + def set_field(self, model: type[Model], field: type[Field], name: str) -> None: ... + def set_model(self, through_model: type[Model]) -> None: ... + +class MetaField(Field): + # These are declared as class variables in the source code but are used like local + # variables + column_name: str | None + default: Any + model: type[Model] + name: str | None + primary_key: bool + +class ManyToManyFieldAccessor(FieldAccessor): + model: type[Model] + rel_model: type[Model] + through_model: type[Model] + src_fk: ForeignKeyField + dest_fk: ForeignKeyField + def __init__(self, model: type[Model], field: ForeignKeyField, name: str): ... + @overload + def __get__(self, instance: None, instance_type: type[Incomplete] = ..., force_query: bool = ...) -> Field: ... + @overload + def __get__(self, instance: _T, instance_type: type[_T] = ..., force_query: bool = ...) -> list[str] | ManyToManyQuery: ... + def __set__(self, instance, value) -> None: ... + +class ManyToManyField(MetaField): + accessor_class: ClassVar[type[ManyToManyFieldAccessor]] + # Instance variables + through_model: type[Model] | DeferredThroughModel + rel_model: type[Model] + backref: str | None + def __init__( + self, + model: type[Model], + backref: str | None = ..., + through_model: type[Model] | DeferredThroughModel | None = ..., + on_delete: str | None = ..., + on_update: str | None = ..., + _is_backref: bool = ..., + ): ... + def bind(self, model: type[Model], name: str, set_attribute: bool = ...) -> None: ... + def get_models(self) -> list[type[Model]]: ... + def get_through_model(self) -> type[Model] | DeferredThroughModel: ... + +class VirtualField(MetaField, Generic[_FieldT]): + field_class: type[_FieldT] + field_instance: _FieldT | None + def __init__(self, field_class: type[Incomplete] | None = ..., *args: object, **kwargs: object): ... + def db_value(self, value: object) -> Any: ... + def python_value(self, value: object) -> Any: ... + def bind(self, model: type[Model], name: str, set_attribute: bool = ...) -> None: ... + +class CompositeKey(MetaField): + sequence = None + field_names: tuple[str, ...] + # The following attributes are not set in the constructor an so may not always be + # present. + model: type[Model] + column_name: str + def __init__(self, *field_names: str): ... + @property + def safe_field_names(self) -> list[str] | tuple[str, ...]: ... + @overload + def __get__(self, instance: None, instance_type: type) -> CompositeKey: ... + @overload + def __get__(self, instance: _T, instance_type: type[_T]) -> tuple[Incomplete, ...]: ... + def __set__(self, instance: Model, value: list[Incomplete] | tuple[Incomplete, ...]) -> None: ... + def __eq__(self, other): ... + def __ne__(self, other): ... + def __hash__(self) -> int: ... + def __sql__(self, ctx: Context) -> Context: ... + def bind(self, model: type[Model], name: str, set_attribute: bool = ...) -> None: ... + +# MODELS + +class SchemaManager: + model: type[Model] + context_options: dict[str, object] + def __init__(self, model: type[Model], database: Database | None = ..., **context_options: object): ... + @property + def database(self) -> Database: ... + @database.setter + def database(self, value: Database | None) -> None: ... + def create_table(self, safe: bool = ..., **options: object) -> None: ... + def create_table_as(self, table_name: str, query: SelectQuery, safe: bool = ..., **meta: object) -> None: ... + def drop_table(self, safe: bool = ..., **options: object) -> None: ... + def truncate_table(self, restart_identity: bool = ..., cascade: bool = ...) -> None: ... + def create_indexes(self, safe: bool = ...) -> None: ... + def drop_indexes(self, safe: bool = ...) -> None: ... + def create_sequence(self, field: Field) -> None: ... + def drop_sequence(self, field: Field) -> None: ... + def create_foreign_key(self, field: Field) -> None: ... + def create_sequences(self) -> None: ... + def create_all(self, safe: bool = ..., **table_options: object) -> None: ... + def drop_sequences(self) -> None: ... + def drop_all(self, safe: bool = ..., drop_sequences: bool = ..., **options: object) -> None: ... + +class Metadata: + model: type[Model] + database: Database | None + fields: dict[str, object] # TODO (dargueta) This may be dict[str, Field] + columns: dict[str, object] # TODO (dargueta) Verify this + combined: dict[str, object] # TODO (dargueta) Same as above + sorted_fields: list[Field] + sorted_field_names: list[str] + defaults: dict[str, object] + name: str + table_function: Callable[[type[Model]], str] | None + legacy_table_names: bool + table_name: str + indexes: list[Index | ModelIndex | SQL] + constraints: Iterable[Callable[[str], SQL] | SQL] | None + primary_key: Literal[False] | Field | CompositeKey | None + composite_key: bool | None + auto_increment: bool | None + only_save_dirty: bool + depends_on: Sequence[type[Model]] | None + table_settings: Mapping[str, object] + temporary: bool + refs: dict[ForeignKeyField, type[Model]] + backrefs: MutableMapping[ForeignKeyField, list[type[Model]]] + model_refs: MutableMapping[type[Model], list[ForeignKeyField]] + model_backrefs: MutableMapping[ForeignKeyField, list[type[Model]]] + manytomany: dict[str, ManyToManyField] + options: Mapping[str, object] + table: Table | None + entity: Entity + def __init__( + self, + model: type[Model], + database: Database | None = ..., + table_name: str | None = ..., + indexes: Iterable[str | Sequence[str]] | None = ..., + primary_key: Literal[False] | Field | CompositeKey | None = ..., + constraints: Iterable[Callable[[str], SQL] | SQL] | None = ..., + schema: str | None = ..., + only_save_dirty: bool = ..., + depends_on: Sequence[type[Model]] | None = ..., + options: Mapping[str, object] | None = ..., + db_table: str | None = ..., + table_function: Callable[[type[Model]], str] | None = ..., + table_settings: Mapping[str, object] | None = ..., + without_rowid: bool = ..., + temporary: bool = ..., + legacy_table_names: bool = ..., + **kwargs: object, + ): ... + def make_table_name(self) -> str: ... + def model_graph( + self, refs: bool = ..., backrefs: bool = ..., depth_first: bool = ... + ) -> list[tuple[ForeignKeyField, type[Model], bool]]: ... + def add_ref(self, field: ForeignKeyField) -> None: ... + def remove_ref(self, field: ForeignKeyField) -> None: ... + def add_manytomany(self, field: ManyToManyField) -> None: ... + def remove_manytomany(self, field: ManyToManyField) -> None: ... + def get_rel_for_model( + self, model: type[Model] | ModelAlias[Incomplete] + ) -> tuple[list[ForeignKeyField], list[type[Model]]]: ... + def add_field(self, field_name: str, field: Field, set_attribute: bool = ...) -> None: ... + def remove_field(self, field_name: str) -> None: ... + def set_primary_key(self, name: str, field: Field | CompositeKey) -> None: ... + def get_primary_keys(self) -> tuple[Field, ...]: ... + def get_default_dict(self) -> dict[str, object]: ... + def fields_to_index(self) -> list[ModelIndex]: ... + def set_database(self, database: Database) -> None: ... + def set_table_name(self, table_name: str) -> None: ... + +class SubclassAwareMetadata(Metadata): + models: ClassVar[list[type[Model]]] + def __init__(self, model: type[Model], *args: object, **kwargs: object): ... + def map_models(self, fn: Callable[[type[Model]], Any]) -> None: ... + +class DoesNotExist(Exception): ... + +class ModelBase(type): + inheritable: ClassVar[set[str]] + def __iter__(self) -> Iterator[Any]: ... + def __getitem__(self, key: object) -> Model: ... + def __setitem__(self, key: object, value: Model) -> None: ... + def __delitem__(self, key: object) -> None: ... + def __contains__(self, key: object) -> bool: ... + def __len__(self) -> int: ... + def __bool__(self) -> bool: ... + def __nonzero__(self) -> bool: ... + def __sql__(self, ctx: Context) -> Context: ... + +class _BoundModelsContext(_callable_context_manager): + models: Iterable[type[Model]] + database: Database + bind_refs: bool + bind_backrefs: bool + def __init__(self, models: Iterable[type[Model]], database, bind_refs: bool, bind_backrefs: bool): ... + def __enter__(self) -> Iterable[type[Model]]: ... + def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None: ... + +class Model(Node, metaclass=ModelBase): + _meta: ClassVar[Metadata] + _schema: ClassVar[SchemaManager] + DoesNotExist: ClassVar[type[DoesNotExist]] + __data__: MutableMapping[str, object] + __rel__: MutableMapping[str, object] + def __init__(self, *, __no_default__: int | bool = ..., **kwargs: object): ... + @classmethod + def validate_model(cls) -> None: ... + @classmethod + def alias(cls, alias: str | None = ...) -> ModelAlias[Incomplete]: ... + @classmethod + def select(cls, *fields: Field) -> ModelSelect[Incomplete]: ... + @classmethod + def update(cls, __data: Iterable[str | Field] | None = ..., **update: Any) -> ModelUpdate: ... + @classmethod + def insert(cls, __data: Iterable[str | Field] | None = ..., **insert: Any) -> ModelInsert: ... + @overload + @classmethod + def insert_many(cls, rows: Iterable[Mapping[str, object]], fields: None) -> ModelInsert: ... + @overload + @classmethod + def insert_many(cls, rows: Iterable[tuple[Incomplete, ...]], fields: Sequence[Field]) -> ModelInsert: ... + @classmethod + def insert_from(cls, query: SelectQuery, fields: Iterable[Field | str]) -> ModelInsert: ... + @classmethod + def replace(cls, __data: Iterable[str | Field] | None = ..., **insert: object) -> OnConflict: ... + @classmethod + def replace_many(cls, rows: Iterable[tuple[Incomplete, ...]], fields: Sequence[Field] | None = ...) -> OnConflict: ... + @classmethod + def raw(cls, sql: str, *params: object) -> ModelRaw[Incomplete]: ... + @classmethod + def delete(cls) -> ModelDelete: ... + @classmethod + def create(cls: type[Self], **query) -> Self: ... + @classmethod + def bulk_create(cls, model_list: Iterable[type[Model]], batch_size: int | None = ...) -> None: ... + @classmethod + def bulk_update( + cls, model_list: Iterable[type[Model]], fields: Iterable[str | Field], batch_size: int | None = ... + ) -> int: ... + @classmethod + def noop(cls) -> NoopModelSelect: ... + @classmethod + def get(cls, *query: object, **filters: object) -> ModelSelect[Incomplete]: ... + @classmethod + def get_or_none(cls, *query: object, **filters: object) -> ModelSelect[Incomplete] | None: ... + @classmethod + def get_by_id(cls, pk: object) -> ModelSelect[Incomplete]: ... + # TODO (dargueta) I'm 99% sure of return value for this one + @classmethod + def set_by_id(cls, key, value) -> CursorWrapper[Incomplete]: ... + # TODO (dargueta) I'm also not 100% about this one's return value. + @classmethod + def delete_by_id(cls, pk: object) -> CursorWrapper[Incomplete]: ... + @classmethod + def get_or_create(cls, *, defaults: Mapping[str, object] = ..., **kwargs: object) -> tuple[Any, bool]: ... + @classmethod + def filter(cls, *dq_nodes: DQ, **filters: Any) -> SelectQuery: ... + def get_id(self) -> Any: ... + def save(self, force_insert: bool = ..., only: Iterable[str | Field] | None = ...) -> Literal[False] | int: ... + def is_dirty(self) -> bool: ... + @property + def dirty_fields(self) -> list[Field]: ... + def dependencies(self, search_nullable: bool = ...) -> Iterator[tuple[bool | Node, ForeignKeyField]]: ... + def delete_instance(self: Self, recursive: bool = ..., delete_nullable: bool = ...) -> Self: ... + def __hash__(self) -> int: ... + def __eq__(self, other: object) -> bool: ... + def __ne__(self, other: object) -> bool: ... + def __sql__(self, ctx: Context) -> Context: ... + @classmethod + def bind( + cls, database: Database, bind_refs: bool = ..., bind_backrefs: bool = ..., _exclude: MutableSet[type[Model]] | None = ... + ) -> bool: ... + @classmethod + def bind_ctx(cls, database: Database, bind_refs: bool = ..., bind_backrefs: bool = ...) -> _BoundModelsContext: ... + @classmethod + def table_exists(cls) -> bool: ... + @classmethod + def create_table(cls, safe: bool = ..., *, fail_silently: bool = ..., **options: object) -> None: ... + @classmethod + def drop_table(cls, safe: bool = ..., drop_sequences: bool = ..., **options: object) -> None: ... + @classmethod + def truncate_table(cls, **options: object) -> None: ... + @classmethod + def index(cls, *fields: Field | Node | str, **kwargs: object) -> ModelIndex: ... + @classmethod + def add_index(cls, *fields: str | SQL | Index, **kwargs: object) -> None: ... + +# "Provide a separate reference to a model in a query." +class ModelAlias(Node, Generic[_ModelT]): + model: type[_ModelT] + alias: str | None + def __init__(self, model: type[_ModelT], alias: str | None = ...): ... + def __getattr__(self, attr: str) -> Any: ... + def __setattr__(self, attr: str, value: object) -> NoReturn: ... + def get_field_aliases(self) -> list[Field]: ... + def select(self, *selection: Field) -> ModelSelect[Incomplete]: ... + def __call__(self, **kwargs) -> _ModelT: ... + def __sql__(self, ctx: Context) -> Context: ... + +_TModelOrTable: TypeAlias = Union[type[Model], ModelAlias[Incomplete], Table] +_TSubquery: TypeAlias = Union[tuple[Query, type[Model]], type[Model], ModelAlias[Incomplete]] +_TFieldOrModel: TypeAlias = _TModelOrTable | Field + +class FieldAlias(Field): + source: Node + model: type[Model] + field: Field + # TODO (dargueta): Making an educated guess about `source`; might be `Node` + def __init__(self, source: MetaField, field: Field): ... + @classmethod + def create(cls, source: ModelAlias[Incomplete], field: str) -> FieldAlias: ... + def clone(self) -> FieldAlias: ... + def adapt(self, value: object) -> Any: ... + def python_value(self, value: object) -> Any: ... + def db_value(self, value: object) -> Any: ... + def __getattr__(self, attr: str) -> Incomplete: ... + def __sql__(self, ctx: Context) -> Context: ... + +def sort_models(models: Iterable[type[Model]]) -> list[type[Model]]: ... + +class _ModelQueryHelper: + default_row_type: ClassVar[int] + def objects(self: Self, constructor: Callable[..., Any] | None = ...) -> Self: ... + +class ModelRaw(_ModelQueryHelper, RawQuery, Generic[_ModelT]): + model: type[_ModelT] + def __init__(self, model: type[_ModelT], sql: str, params: tuple[Incomplete, ...], **kwargs: object): ... + def get(self) -> _ModelT: ... + +class BaseModelSelect(_ModelQueryHelper): + def union_all(self, rhs: object) -> ModelCompoundSelectQuery[Incomplete]: ... + __add__ = union_all + def union(self, rhs: object) -> ModelCompoundSelectQuery[Incomplete]: ... + __or__ = union + def intersect(self, rhs: object) -> ModelCompoundSelectQuery[Incomplete]: ... + __and__ = intersect + def except_(self, rhs: object) -> ModelCompoundSelectQuery[Incomplete]: ... + __sub__ = except_ + def __iter__(self) -> Iterator[Any]: ... + def prefetch(self, *subqueries: _TSubquery) -> list[Any]: ... + def get(self, database: Database | None = ...) -> Any: ... + def group_by(self: Self, *columns: type[Model] | Table | Field) -> Self: ... + +class ModelCompoundSelectQuery(BaseModelSelect, CompoundSelectQuery, Generic[_ModelT]): + model: type[_ModelT] + def __init__(self, model: type[_ModelT], *args: object, **kwargs: object): ... + +class ModelSelect(BaseModelSelect, Select, Generic[_ModelT]): + model: type[_ModelT] + def __init__(self, model: type[_ModelT], fields_or_models: Iterable[_TFieldOrModel], is_default: bool = ...): ... + def clone(self: Self) -> Self: ... + def select(self: Self, *fields_or_models: _TFieldOrModel) -> Self: ... # type: ignore + def switch(self: Self, ctx: type[Model] | None = ...) -> Self: ... + def join( # type: ignore + self: Self, + dest: type[Model] | Table | ModelAlias[Incomplete] | ModelSelect[Incomplete], + join_type: int = ..., + on: Column | Expression | Field | None = ..., + src: type[Model] | Table | ModelAlias[Incomplete] | ModelSelect[Incomplete] | None = ..., + attr: str | None = ..., + ) -> Self: ... + def join_from( + self, + src: type[Model] | Table | ModelAlias[Incomplete] | ModelSelect[Incomplete], + dest: type[Model] | Table | ModelAlias[Incomplete] | ModelSelect[Incomplete], + join_type: int = ..., + on: Column | Expression | Field | None = ..., + attr: str | None = ..., + ) -> ModelSelect[Incomplete]: ... + def ensure_join( + self, lm: type[Model], rm: type[Model], on: Column | Expression | Field | None = ..., **join_kwargs: Any + ) -> ModelSelect[Incomplete]: ... + # TODO (dargueta): 85% sure about the return value + def convert_dict_to_node(self, qdict: Mapping[str, object]) -> tuple[list[Expression], list[Field]]: ... + def filter(self, *args: Node, **kwargs: object) -> ModelSelect[Incomplete]: ... + def create_table(self, name: str, safe: bool = ..., **meta: object) -> None: ... + def __sql_selection__(self, ctx: Context, is_subquery: bool = ...) -> Context: ... + +class NoopModelSelect(ModelSelect[Incomplete]): + def __sql__(self, ctx: Context) -> Context: ... + +class _ModelWriteQueryHelper(_ModelQueryHelper): + model: type[Model] + def __init__(self, model: type[Model], *args: object, **kwargs: object): ... + def returning(self, *returning: type[Model] | Field) -> _ModelWriteQueryHelper: ... + +class ModelUpdate(_ModelWriteQueryHelper, Update): ... + +class ModelInsert(_ModelWriteQueryHelper, Insert): + default_row_type: ClassVar[int] + def returning(self, *returning: type[Model] | Field) -> ModelInsert: ... + def get_default_data(self) -> Mapping[str, object]: ... + def get_default_columns(self) -> Sequence[Field]: ... + +class ModelDelete(_ModelWriteQueryHelper, Delete): ... + +class ManyToManyQuery(ModelSelect[Incomplete]): + def __init__( + self, instance: Model, accessor: ManyToManyFieldAccessor, rel: _TFieldOrModel, *args: object, **kwargs: object + ): ... + def add(self, value: SelectQuery | type[Model] | Iterable[str], clear_existing: bool = ...) -> None: ... + def remove(self, value: SelectQuery | type[Model] | Iterable[str]) -> int | None: ... + def clear(self) -> int: ... + +class BaseModelCursorWrapper(DictCursorWrapper, Generic[_ModelT]): + ncols: int + columns: list[str] + converters: list[_ConvFunc] + fields: list[Field] + model: type[_ModelT] + select: Sequence[str] + def __init__(self, cursor: __ICursor, model: type[_ModelT], columns: Sequence[str] | None): ... + def process_row(self, row: tuple) -> Mapping[str, object]: ... # type: ignore + +class ModelDictCursorWrapper(BaseModelCursorWrapper[_ModelT]): + def process_row(self, row: tuple[Incomplete, ...]) -> dict[str, Any]: ... + +class ModelTupleCursorWrapper(ModelDictCursorWrapper[_ModelT]): + constructor: ClassVar[Callable[[Sequence[Any]], tuple[Incomplete, ...]]] + def process_row(self, row: tuple[Incomplete, ...]) -> tuple[Incomplete, ...]: ... # type: ignore + +class ModelNamedTupleCursorWrapper(ModelTupleCursorWrapper[_ModelT]): ... + +class ModelObjectCursorWrapper(ModelDictCursorWrapper[_ModelT]): + constructor: type[_ModelT] | Callable[[Any], _ModelT] + is_model: bool + # TODO (dargueta): `select` is some kind of Sequence + def __init__( + self, + cursor: __ICursor, + model: _ModelT, + select: Sequence[Incomplete], + constructor: type[_ModelT] | Callable[[Any], _ModelT], + ): ... + def process_row(self, row: tuple) -> _ModelT: ... # type: ignore + +class ModelCursorWrapper(BaseModelCursorWrapper[_ModelT]): + # TODO (dargueta) -- Iterable[Union[Join, ...]] + from_list: Iterable[Incomplete] + # TODO (dargueta) -- Mapping[, tuple[?, ?, Callable[..., _ModelT], int?]] + joins: Mapping[Hashable, tuple[object, object, Callable[..., _ModelT], int]] + key_to_constructor: dict[type[_ModelT], Callable[..., _ModelT]] + src_is_dest: dict[type[Model], bool] + src_to_dest: list[ + tuple[Incomplete, Incomplete, Incomplete, bool, Incomplete] + ] # TODO -- tuple[, join_type[1], join_type[0], bool, join_type[3]] + column_keys: list[Incomplete] + def __init__( + self, + cursor: __ICursor, + model: type[_ModelT], + select, + from_list: Iterable[object], + joins: Mapping[Hashable, tuple[object, object, Callable[..., _ModelT], int]], + ): ... + def initialize(self) -> None: ... + def process_row(self, row: tuple) -> _ModelT: ... # type: ignore + +class __PrefetchQuery(NamedTuple): + query: Query # TODO (dargueta): Verify + fields: Sequence[Field] | None + is_backref: bool | None + rel_models: list[type[Model]] | None + field_to_name: list[tuple[Field, str]] | None + model: type[Model] + +class PrefetchQuery(__PrefetchQuery): + def populate_instance(self, instance: Model, id_map: Mapping[tuple[object, object], object]): ... + def store_instance(self, instance: Model, id_map: MutableMapping[tuple[object, object], list[Model]]) -> None: ... + +def prefetch_add_subquery(sq: Query, subqueries: Iterable[_TSubquery]) -> list[PrefetchQuery]: ... +def prefetch(sq: Query, *subqueries: _TSubquery) -> list[object]: ...