-
-
Notifications
You must be signed in to change notification settings - Fork 484
Added dynamic class hook for from_queryset
manager
#427
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
89a5797
6d8a439
0ca8abc
51207ba
8af158d
8ca96f8
98ca0cc
4102a7c
48feb3f
fae8316
dabdfa1
b032a3d
745d00f
354e9f4
78c8b93
d8b2427
12a313b
1883270
30e53d7
203eeaf
e43fd2b
5eba681
dae217a
725377d
1ae7eca
43ef5a9
a6cb6f3
164f83d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,14 +6,15 @@ | |
from mypy.checker import TypeChecker | ||
from mypy.mro import calculate_mro | ||
from mypy.nodes import ( | ||
Block, CallExpr, ClassDef, Context, Expression, MemberExpr, MypyFile, NameExpr, PlaceholderNode, StrExpr, | ||
SymbolTable, SymbolTableNode, TypeInfo, Var, | ||
GDEF, Argument, Block, CallExpr, ClassDef, Context, Expression, FuncDef, MemberExpr, MypyFile, NameExpr, | ||
PlaceholderNode, StrExpr, SymbolTable, SymbolTableNode, TypeInfo, Var, | ||
) | ||
from mypy.plugin import ( | ||
AttributeContext, ClassDefContext, DynamicClassDefContext, FunctionContext, MethodContext, | ||
) | ||
from mypy.plugins.common import add_method_to_class | ||
from mypy.semanal import SemanticAnalyzer | ||
from mypy.types import AnyType, Instance, NoneTyp, ProperType | ||
from mypy.types import AnyType, CallableType, Instance, NoneTyp, ProperType | ||
from mypy.types import Type as MypyType | ||
from mypy.types import TypeOfAny, UnionType | ||
|
||
|
@@ -94,6 +95,80 @@ def lookup_typeinfo_or_defer(self, fullname: str, *, | |
|
||
return sym.node | ||
|
||
def copy_method_to_another_class( | ||
self, | ||
ctx: ClassDefContext, | ||
self_type: Instance, | ||
new_method_name: str, | ||
method_node: FuncDef) -> None: | ||
if method_node.type is None: | ||
if not self.defer_till_next_iteration(reason='method_node.type is None'): | ||
raise new_helpers.TypeInfoNotFound(method_node.fullname) | ||
|
||
arguments, return_type = build_unannotated_method_args(method_node) | ||
add_method_to_class( | ||
ctx.api, | ||
ctx.cls, | ||
new_method_name, | ||
args=arguments, | ||
return_type=return_type, | ||
self_type=self_type) | ||
return | ||
|
||
method_type = cast(CallableType, method_node.type) | ||
if not isinstance(method_type, CallableType) and not self.defer_till_next_iteration( | ||
reason='method_node.type is not CallableType'): | ||
raise new_helpers.TypeInfoNotFound(method_node.fullname) | ||
|
||
arguments = [] | ||
bound_return_type = self.semanal_api.anal_type( | ||
method_type.ret_type, | ||
allow_placeholder=True) | ||
|
||
if bound_return_type is None and self.defer_till_next_iteration(): | ||
raise new_helpers.TypeInfoNotFound(method_node.fullname + ' return type') | ||
|
||
assert bound_return_type is not None | ||
|
||
if isinstance(bound_return_type, PlaceholderNode): | ||
raise new_helpers.TypeInfoNotFound('return type ' + method_node.fullname) | ||
|
||
for arg_name, arg_type, original_argument in zip( | ||
method_type.arg_names[1:], | ||
method_type.arg_types[1:], | ||
method_node.arguments[1:]): | ||
bound_arg_type = self.semanal_api.anal_type(arg_type, allow_placeholder=True) | ||
if bound_arg_type is None and not self.defer_till_next_iteration(reason='bound_arg_type is None'): | ||
error_msg = 'of {} argument of {}'.format(arg_name, method_node.fullname) | ||
raise new_helpers.TypeInfoNotFound(error_msg) | ||
|
||
assert bound_arg_type is not None | ||
|
||
if isinstance(bound_arg_type, PlaceholderNode) and self.defer_till_next_iteration( | ||
reason='bound_arg_type is None'): | ||
raise new_helpers.TypeInfoNotFound('of ' + arg_name + ' argument of ' + method_node.fullname) | ||
|
||
var = Var( | ||
name=original_argument.variable.name, | ||
type=arg_type) | ||
var.line = original_argument.variable.line | ||
var.column = original_argument.variable.column | ||
argument = Argument( | ||
variable=var, | ||
type_annotation=bound_arg_type, | ||
initializer=original_argument.initializer, | ||
kind=original_argument.kind) | ||
argument.set_line(original_argument) | ||
arguments.append(argument) | ||
|
||
add_method_to_class( | ||
ctx.api, | ||
ctx.cls, | ||
new_method_name, | ||
args=arguments, | ||
return_type=bound_return_type, | ||
self_type=self_type) | ||
|
||
def new_typeinfo(self, name: str, bases: List[Instance], module_fullname: Optional[str] = None) -> TypeInfo: | ||
class_def = ClassDef(name, Block([])) | ||
class_def.fullname = self.semanal_api.qualified_name(name) | ||
|
@@ -118,11 +193,46 @@ def __call__(self, ctx: DynamicClassDefContext) -> None: | |
self.semanal_api = cast(SemanticAnalyzer, ctx.api) | ||
self.create_new_dynamic_class() | ||
|
||
def generate_manager_info_and_module(self, base_manager_info: TypeInfo) -> Tuple[TypeInfo, MypyFile]: | ||
new_manager_info = self.semanal_api.basic_new_typeinfo( | ||
self.class_name, | ||
basetype_or_fallback=Instance( | ||
base_manager_info, | ||
[AnyType(TypeOfAny.unannotated)]) | ||
) | ||
new_manager_info.line = self.call_expr.line | ||
new_manager_info.defn.line = self.call_expr.line | ||
new_manager_info.metaclass_type = new_manager_info.calculate_metaclass_type() | ||
|
||
current_module = self.semanal_api.cur_mod_node | ||
current_module.names[self.class_name] = SymbolTableNode( | ||
GDEF, | ||
new_manager_info, | ||
plugin_generated=True) | ||
return new_manager_info, current_module | ||
|
||
@abstractmethod | ||
def create_new_dynamic_class(self) -> None: | ||
raise NotImplementedError | ||
|
||
|
||
class DynamicClassFromMethodCallback(DynamicClassPluginCallback): | ||
callee: MemberExpr | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add class attribute There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding an assert in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, in |
||
def __call__(self, ctx: DynamicClassDefContext) -> None: | ||
self.class_name = ctx.name | ||
self.call_expr = ctx.call | ||
|
||
assert ctx.call.callee is not None | ||
if not isinstance(ctx.call.callee, MemberExpr): | ||
# throw error? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, I will include this in next PR |
||
return | ||
self.callee = ctx.call.callee | ||
|
||
self.semanal_api = cast(SemanticAnalyzer, ctx.api) | ||
self.create_new_dynamic_class() | ||
|
||
|
||
class ClassDefPluginCallback(SemanalPluginCallback): | ||
reason: Expression | ||
class_defn: ClassDef | ||
|
@@ -396,3 +506,12 @@ def get_nested_meta_node_for_current_class(info: TypeInfo) -> Optional[TypeInfo] | |
if metaclass_sym is not None and isinstance(metaclass_sym.node, TypeInfo): | ||
return metaclass_sym.node | ||
return None | ||
|
||
|
||
def build_unannotated_method_args(method_node: FuncDef) -> Tuple[List[Argument], MypyType]: | ||
prepared_arguments = [] | ||
for argument in method_node.arguments[1:]: | ||
argument.type_annotation = AnyType(TypeOfAny.unannotated) | ||
prepared_arguments.append(argument) | ||
return_type = AnyType(TypeOfAny.unannotated) | ||
return prepared_arguments, return_type |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
from typing import List, Tuple | ||
|
||
from mypy.nodes import Argument, FuncDef, NameExpr, StrExpr, TypeInfo | ||
from mypy.plugin import ClassDefContext | ||
from mypy.types import AnyType, Instance | ||
from mypy.types import Type as MypyType | ||
from mypy.types import TypeOfAny | ||
|
||
from mypy_django_plugin.lib import fullnames, helpers | ||
|
||
|
||
def build_unannotated_method_args(method_node: FuncDef) -> Tuple[List[Argument], MypyType]: | ||
prepared_arguments = [] | ||
for argument in method_node.arguments[1:]: | ||
argument.type_annotation = AnyType(TypeOfAny.unannotated) | ||
prepared_arguments.append(argument) | ||
return_type = AnyType(TypeOfAny.unannotated) | ||
return prepared_arguments, return_type | ||
|
||
|
||
class ManagerFromQuerySetCallback(helpers.DynamicClassFromMethodCallback): | ||
def create_new_dynamic_class(self) -> None: | ||
|
||
base_manager_info = self.callee.expr.node # type: ignore | ||
|
||
if base_manager_info is None and not self.defer_till_next_iteration(reason='base_manager_info is None'): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this Also, let's see what type There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kszmigiel I still want you to add proper error string here with an explanation of the conditions of why it could be |
||
# what exception should be thrown here? | ||
return | ||
|
||
assert isinstance(base_manager_info, TypeInfo) | ||
|
||
new_manager_info, current_module = self.generate_manager_info_and_module(base_manager_info) | ||
|
||
passed_queryset = self.call_expr.args[0] | ||
assert isinstance(passed_queryset, NameExpr) | ||
|
||
derived_queryset_fullname = passed_queryset.fullname | ||
assert derived_queryset_fullname is not None | ||
|
||
sym = self.semanal_api.lookup_fully_qualified_or_none(derived_queryset_fullname) | ||
assert sym is not None | ||
if sym.node is None and not self.defer_till_next_iteration(reason='sym.node is None'): | ||
# inherit from Any to prevent false-positives, if queryset class cannot be resolved | ||
new_manager_info.fallback_to_any = True | ||
return | ||
|
||
derived_queryset_info = sym.node | ||
assert isinstance(derived_queryset_info, TypeInfo) | ||
|
||
if len(self.call_expr.args) > 1: | ||
expr = self.call_expr.args[1] | ||
assert isinstance(expr, StrExpr) | ||
custom_manager_generated_name = expr.value | ||
else: | ||
custom_manager_generated_name = base_manager_info.name + 'From' + derived_queryset_info.name | ||
|
||
custom_manager_generated_fullname = '.'.join(['django.db.models.manager', custom_manager_generated_name]) | ||
if 'from_queryset_managers' not in base_manager_info.metadata: | ||
base_manager_info.metadata['from_queryset_managers'] = {} | ||
base_manager_info.metadata['from_queryset_managers'][ | ||
custom_manager_generated_fullname] = new_manager_info.fullname | ||
class_def_context = ClassDefContext( | ||
cls=new_manager_info.defn, | ||
reason=self.call_expr, api=self.semanal_api) | ||
self_type = Instance(new_manager_info, []) | ||
# we need to copy all methods in MRO before django.db.models.query.QuerySet | ||
for class_mro_info in derived_queryset_info.mro: | ||
if class_mro_info.fullname == fullnames.QUERYSET_CLASS_FULLNAME: | ||
break | ||
for name, sym in class_mro_info.names.items(): | ||
if isinstance(sym.node, FuncDef): | ||
self.copy_method_to_another_class( | ||
class_def_context, | ||
self_type, | ||
new_method_name=name, | ||
method_node=sym.node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that this PR has unrelated changes from other PRs. Is that the case, @kszmigiel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I've updated stub files to the
master
version, I can revert it if needed (this commit: b032a3d)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The proper way to do it is to
rebase
this PR based on currentmaster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 It'll be quite a challenge for me, but I'll try