diff --git a/ninja/api_exceptions.py b/ninja/api_exceptions.py new file mode 100644 index 000000000..03d9aa9b3 --- /dev/null +++ b/ninja/api_exceptions.py @@ -0,0 +1,43 @@ +from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union + +from django.http import HttpRequest, HttpResponse +from ninja.errors import ValidationError, ValidationErrorContext +_E = TypeVar("_E", bound=Exception) +Exc = Union[_E, Type[_E]] +ExcHandler = Callable[[HttpRequest, Exc[_E]], HttpResponse] + +class ExceptionRegistry: + def __init__(self): + self._handlers: Dict[Exc, ExcHandler] = {} + + def add_handler(self, exc_class: Type[_E], handler: ExcHandler[_E]) -> None: + assert issubclass(exc_class, Exception) + self._handlers[exc_class] = handler + + def lookup(self, exc: Exc[_E]) -> Optional[ExcHandler[_E]]: + for cls in type(exc).__mro__: + if cls in self._handlers: + return self._handlers[cls] + return None + + def validation_error_from_contexts( + self, error_contexts: List[ValidationErrorContext] + ) -> ValidationError: + errors: List[Dict[str, Any]] = [] + for context in error_contexts: + model = context.model + e = context.pydantic_validation_error + for i in e.errors(include_url=False): + i["loc"] = ( + model.__ninja_param_source__, + ) + model.__ninja_flatten_map_reverse__.get(i["loc"], i["loc"]) + # removing pydantic hints + del i["input"] # type: ignore + if ( + "ctx" in i + and "error" in i["ctx"] + and isinstance(i["ctx"]["error"], Exception) + ): + i["ctx"]["error"] = str(i["ctx"]["error"]) + errors.append(dict(i)) + return ValidationError(errors) diff --git a/ninja/api_registry.py b/ninja/api_registry.py new file mode 100644 index 000000000..f06654250 --- /dev/null +++ b/ninja/api_registry.py @@ -0,0 +1,26 @@ +import os +from typing import List + +from ninja.errors import ConfigError +from ninja.utils import is_debug_server + + +class ApiRegistry: + _registry: List[str] = [] + _imported_while_running_in_debug_server = is_debug_server() + + @classmethod + def validate_namespace(cls, urls_namespace: str) -> None: + skip_registry = os.environ.get("NINJA_SKIP_REGISTRY", False) + if ( + not skip_registry + and urls_namespace in cls._registry + and not cls.debug_server_url_reimport() + ): + msg = f"..." + raise ConfigError(msg.strip()) + cls._registry.append(urls_namespace) + + @classmethod + def debug_server_url_reimport(cls) -> bool: + return cls._imported_while_running_in_debug_server and not is_debug_server() diff --git a/ninja/api_responses.py b/ninja/api_responses.py new file mode 100644 index 000000000..709ee0ffa --- /dev/null +++ b/ninja/api_responses.py @@ -0,0 +1,26 @@ +from typing import Any, Optional +from django.http import HttpRequest, HttpResponse + +class ResponseFactory: + def __init__(self, renderer): + self.renderer = renderer + + def create_response( + self, + request: HttpRequest, + data: Any, + *, + status: int, + temporal_response: Optional[HttpResponse] = None, + ) -> HttpResponse: + content = self.renderer.render(request, data, response_status=status) + if temporal_response: + temporal_response.content = content + return temporal_response + return HttpResponse(content, status=status, content_type=self.get_content_type()) + + def create_temporal_response(self) -> HttpResponse: + return HttpResponse("", content_type=self.get_content_type()) + + def get_content_type(self) -> str: + return f"{self.renderer.media_type}; charset={self.renderer.charset}" diff --git a/ninja/api_routing.py b/ninja/api_routing.py new file mode 100644 index 000000000..c9ff55673 --- /dev/null +++ b/ninja/api_routing.py @@ -0,0 +1,71 @@ +from typing import ( + Any, + List, + Optional, + Tuple, + Union, +) +from ninja.throttling import BaseThrottle +from django.urls import URLPattern, URLResolver, reverse +from ninja.constants import NOT_SET, NOT_SET_TYPE +from ninja.router import Router +from ninja.openapi.urls import get_openapi_urls, get_root_url +from ninja.utils import normalize_path +from django.utils.module_loading import import_string + +class RouterManager: + def __init__(self, api: "NinjaAPI", default_router: Optional[Router] = None): + self.api = api + self._routers: List[Tuple[str, Router]] = [] + self.default_router = default_router or Router() + self.add_router("", self.default_router) + + def add_router( + self, + prefix: str, + router: Union[Router, str], + *, + auth: Any = NOT_SET, + throttle: Union[BaseThrottle, List[BaseThrottle], NOT_SET_TYPE] = NOT_SET, + tags: Optional[List[str]] = None, + parent_router: Optional[Router] = None, + ) -> None: + if isinstance(router, str): + router = import_string(router) + assert isinstance(router, Router) + + if auth is not NOT_SET: + router.auth = auth + + if throttle is not NOT_SET: + router.throttle = throttle + + if tags is not None: + router.tags = tags + + # Inherit API-level decorators from default router + # Prepend API decorators so they execute first (outer decorators) + router._decorators = self.default_router._decorators + router._decorators + + if parent_router: + parent_prefix = next( + (path for path, r in self._routers if r is parent_router), None + ) # pragma: no cover + assert parent_prefix is not None + prefix = normalize_path("/".join((parent_prefix, prefix))).lstrip("/") + + self._routers.extend(router.build_routers(prefix)) + router.set_api_instance(self.api, parent_router) + + def _get_urls(self) -> List[Union[URLResolver, URLPattern]]: + result = get_openapi_urls(self.api) + + for prefix, router in self._routers: + result.extend(router.urls_paths(prefix)) + + result.append(get_root_url(self.api)) + return result + + def get_root_path(self, path_params): + name = f"{self.api.urls_namespace}:api-root" + return reverse(name, kwargs=path_params) diff --git a/ninja/main.py b/ninja/main.py index af5ed5df6..158d14411 100644 --- a/ninja/main.py +++ b/ninja/main.py @@ -1,4 +1,3 @@ -import os from typing import ( TYPE_CHECKING, Any, @@ -14,9 +13,12 @@ ) from django.http import HttpRequest, HttpResponse -from django.urls import URLPattern, URLResolver, reverse -from django.utils.module_loading import import_string +from django.urls import URLPattern, URLResolver +from ninja.api_exceptions import ExceptionRegistry +from ninja.api_registry import ApiRegistry +from ninja.api_responses import ResponseFactory +from ninja.api_routing import RouterManager from ninja.constants import NOT_SET, NOT_SET_TYPE from ninja.decorators import DecoratorMode from ninja.errors import ( @@ -28,13 +30,12 @@ from ninja.openapi import get_schema from ninja.openapi.docs import DocsBase, Swagger from ninja.openapi.schema import OpenAPISchema -from ninja.openapi.urls import get_openapi_urls, get_root_url from ninja.parser import Parser from ninja.renderers import BaseRenderer, JSONRenderer from ninja.router import Router from ninja.throttling import BaseThrottle from ninja.types import DictStrAny, TCallable -from ninja.utils import is_debug_server, normalize_path +from ninja.utils import is_debug_server if TYPE_CHECKING: from .operation import Operation # pragma: no cover @@ -53,6 +54,8 @@ class NinjaAPI: _registry: List[str] = [] + print("Teste pra Arquitetura de Software") + def __init__( self, *, @@ -96,10 +99,12 @@ def __init__( self.servers = servers or [] self.urls_namespace = urls_namespace or f"api-{self.version}" self.renderer = renderer or JSONRenderer() + self.response_factory = ResponseFactory(self.renderer) self.parser = parser or Parser() self.openapi_extra = openapi_extra or {} - self._exception_handlers: Dict[Exc, ExcHandler] = {} + self.exceptions = ExceptionRegistry() + self.set_default_exception_handlers() self.auth: Optional[Union[Sequence[Callable], NOT_SET_TYPE]] @@ -110,10 +115,12 @@ def __init__( self.auth = auth self.throttle = throttle + + + self.routing = RouterManager(self, default_router=default_router) + self.default_router = self.routing.default_router + self._routers = self.routing._routers - self._routers: List[Tuple[str, Router]] = [] - self.default_router = default_router or Router() - self.add_router("", self.default_router) def get( self, @@ -386,7 +393,7 @@ def add_decorator( """ # Store decorator on default router - will be inherited by all routers during build self.default_router.add_decorator(decorator, mode) - + def add_router( self, prefix: str, @@ -397,32 +404,15 @@ def add_router( tags: Optional[List[str]] = None, parent_router: Optional[Router] = None, ) -> None: - if isinstance(router, str): - router = import_string(router) - assert isinstance(router, Router) - - if auth is not NOT_SET: - router.auth = auth - - if throttle is not NOT_SET: - router.throttle = throttle - - if tags is not None: - router.tags = tags - - # Inherit API-level decorators from default router - # Prepend API decorators so they execute first (outer decorators) - router._decorators = self.default_router._decorators + router._decorators - - if parent_router: - parent_prefix = next( - (path for path, r in self._routers if r is parent_router), None - ) # pragma: no cover - assert parent_prefix is not None - prefix = normalize_path("/".join((parent_prefix, prefix))).lstrip("/") + self.routing.add_router( + prefix=prefix, + router=router, + auth=auth, + throttle=throttle, + tags=tags, + parent_router=parent_router, + ) - self._routers.extend(router.build_routers(prefix)) - router.set_api_instance(self, parent_router) @property def urls(self) -> Tuple[List[Union[URLResolver, URLPattern]], str, str]: @@ -435,25 +425,12 @@ def urls(self) -> Tuple[List[Union[URLResolver, URLPattern]], str, str]: """ self._validate() return ( - self._get_urls(), + self.routing._get_urls(), "ninja", self.urls_namespace.split(":")[-1], # ^ if api included into nested urls, we only care about last bit here ) - def _get_urls(self) -> List[Union[URLResolver, URLPattern]]: - result = get_openapi_urls(self) - - for prefix, router in self._routers: - result.extend(router.urls_paths(prefix)) - - result.append(get_root_url(self)) - return result - - def get_root_path(self, path_params: DictStrAny) -> str: - name = f"{self.urls_namespace}:api-root" - return reverse(name, kwargs=path_params) - def create_response( self, request: HttpRequest, @@ -466,23 +443,15 @@ def create_response( status = temporal_response.status_code assert status - content = self.renderer.render(request, data, response_status=status) - - if temporal_response: - response = temporal_response - response.content = content - else: - response = HttpResponse( - content, status=status, content_type=self.get_content_type() - ) - - return response + return self.response_factory.create_response( + request, data, status=status, temporal_response=temporal_response + ) def create_temporal_response(self, request: HttpRequest) -> HttpResponse: - return HttpResponse("", content_type=self.get_content_type()) + return self.response_factory.create_temporal_response() def get_content_type(self) -> str: - return f"{self.renderer.media_type}; charset={self.renderer.charset}" + return self.response_factory.get_content_type() def get_openapi_schema( self, @@ -491,7 +460,7 @@ def get_openapi_schema( path_params: Optional[DictStrAny] = None, ) -> OpenAPISchema: if path_prefix is None: - path_prefix = self.get_root_path(path_params or {}) + path_prefix = self.routing.get_root_path(path_params or {}) return get_schema(api=self, path_prefix=path_prefix) def get_openapi_operation_id(self, operation: "Operation") -> str: @@ -509,8 +478,7 @@ def get_operation_url_name(self, operation: "Operation", router: Router) -> str: def add_exception_handler( self, exc_class: Type[_E], handler: ExcHandler[_E] ) -> None: - assert issubclass(exc_class, Exception) - self._exception_handlers[exc_class] = handler + self.exceptions.add_handler(exc_class, handler) def exception_handler( self, exc_class: Type[Exception] @@ -525,7 +493,7 @@ def set_default_exception_handlers(self) -> None: set_default_exc_handlers(self) def on_exception(self, request: HttpRequest, exc: Exc[_E]) -> HttpResponse: - handler = self._lookup_exception_handler(exc) + handler = self.exceptions.lookup(exc) if handler is None: raise exc return handler(request, exc) @@ -533,49 +501,11 @@ def on_exception(self, request: HttpRequest, exc: Exc[_E]) -> HttpResponse: def validation_error_from_error_contexts( self, error_contexts: List[ValidationErrorContext] ) -> ValidationError: - errors: List[Dict[str, Any]] = [] - for context in error_contexts: - model = context.model - e = context.pydantic_validation_error - for i in e.errors(include_url=False): - i["loc"] = ( - model.__ninja_param_source__, - ) + model.__ninja_flatten_map_reverse__.get(i["loc"], i["loc"]) - # removing pydantic hints - del i["input"] # type: ignore - if ( - "ctx" in i - and "error" in i["ctx"] - and isinstance(i["ctx"]["error"], Exception) - ): - i["ctx"]["error"] = str(i["ctx"]["error"]) - errors.append(dict(i)) - return ValidationError(errors) - - def _lookup_exception_handler(self, exc: Exc[_E]) -> Optional[ExcHandler[_E]]: - for cls in type(exc).__mro__: - if cls in self._exception_handlers: - return self._exception_handlers[cls] - - return None + return self.exceptions.validation_error_from_contexts(error_contexts) def _validate(self) -> None: # urls namespacing validation - skip_registry = os.environ.get("NINJA_SKIP_REGISTRY", False) - if ( - not skip_registry - and self.urls_namespace in NinjaAPI._registry - and not debug_server_url_reimport() - ): - msg = f""" -Looks like you created multiple NinjaAPIs or TestClients -To let ninja distinguish them you need to set either unique version or urls_namespace - - NinjaAPI(..., version='2.0.0') - - NinjaAPI(..., urls_namespace='otherapi') -Already registered: {NinjaAPI._registry} -""" - raise ConfigError(msg.strip()) - NinjaAPI._registry.append(self.urls_namespace) + ApiRegistry.validate_namespace(self.urls_namespace) _imported_while_running_in_debug_server = is_debug_server() diff --git a/ninja/management/utils.py b/ninja/management/utils.py index 39ea7b858..0ed03e9e6 100644 --- a/ninja/management/utils.py +++ b/ninja/management/utils.py @@ -5,45 +5,96 @@ def command_docstring(cmd: Type[BaseCommand]) -> str: + base_args = _get_base_args(cmd) + parser = cmd().create_parser("command", "") + + doc = _build_doc_header(cmd, parser.description or "") + + args = _get_command_args(parser, base_args) + if args: + doc += _format_attributes_block(args) + + return doc + +def _get_base_args(cmd: Type[BaseCommand]): base_args = [] + if cmd is not BaseCommand: # pragma: no branch base_parser = cmd().create_parser("base", "") for group in base_parser._action_groups: for action in group._group_actions: base_args.append(",".join(action.option_strings)) - parser = cmd().create_parser("command", "") - doc = parser.description or "" + + return base_args + + +def _build_doc_header(cmd: Type[BaseCommand], description: str) -> str: + doc = description or "" if cmd.__doc__: # pragma: no branch - if doc: # pragma: no branch + if doc: doc += "\n\n" doc += textwrap.dedent(cmd.__doc__) + + return doc + + +def _get_command_args(parser, base_args): args = [] + for group in parser._action_groups: for action in group._group_actions: if "--help" in action.option_strings: continue - name = ",".join(action.option_strings) - action_type = action.type - if not action_type and action.nargs != 0: - action_type = str - if action_type: - if isinstance(action_type, type): # pragma: no branch - action_type = action_type.__name__ - name += f" ({action_type})" - help = action.help or "" - if help and not action.required and action.nargs != 0: - if not help.endswith("."): - help += "." - if action.default is not None: - help += f" Defaults to {action.default}." - else: - help += " Optional." - args.append((name, help)) - # Sort args from this class first, then base args. + + args.append(_extract_action_info(action)) + args.sort(key=lambda o: (o[0] in base_args, o[0])) - if args: # pragma: no branch - doc += "\n\nAttributes:" - for name, description in args: - doc += f"\n {name}: {description}" - return doc + + return args + + +def _extract_action_info(action): + name = ",".join(action.option_strings) + action_type = _extract_action_type(action) + + if action_type: + name += f" ({action_type})" + + help_text = _build_help_text(action) + + return (name, help_text) + + +def _extract_action_type(action): + action_type = action.type + + if not action_type and action.nargs != 0: + action_type = str + + if isinstance(action_type, type): # pragma: no branch + return action_type.__name__ + + return action_type + + +def _build_help_text(action): + help_text = action.help or "" + + if help_text and not action.required and action.nargs != 0: + if not help_text.endswith("."): + help_text += "." + + if action.default is not None: + help_text += f" Defaults to {action.default}." + else: + help_text += " Optional." + + return help_text + + +def _format_attributes_block(args): + block = "\n\nAttributes:" + for name, description in args: + block += f"\n {name}: {description}" + return block diff --git a/ninja/openapi/operation_details_builder.py b/ninja/openapi/operation_details_builder.py new file mode 100644 index 000000000..2e5bb41ba --- /dev/null +++ b/ninja/openapi/operation_details_builder.py @@ -0,0 +1,90 @@ +from typing import Any, Dict, List, Optional +from ninja.operation import Operation +from ninja.types import DictStrAny +from django.utils.termcolors import make_style + +bold_red_style = make_style(opts=("bold",), fg="red") + + +class OperationDetailBuilder: + def __init__(self, body_content_types): + self.body_content_types = body_content_types + + def build(self, operation: Operation, open_api_schema) -> DictStrAny: + op_id = operation.operation_id or open_api_schema.api.get_openapi_operation_id(operation) + if op_id in open_api_schema.all_operation_ids: + print( + bold_red_style( + f'Warning: operation_id "{op_id}" is already used (Try giving a different name to: {operation.view_func.__module__}.{operation.view_func.__name__})' + ) + ) + open_api_schema.all_operation_ids.add(op_id) + result = { + "operationId": op_id, + "summary": operation.summary, + "parameters": self.operation_parameters(operation, open_api_schema), + "responses": open_api_schema.response_builder.build(operation, open_api_schema), + } + + if operation.description: + result["description"] = operation.description + + if operation.tags: + result["tags"] = operation.tags + + if operation.deprecated: + result["deprecated"] = operation.deprecated # type: ignore + + body = open_api_schema.request_body_builder.request_body(operation, open_api_schema) + if body: + result["requestBody"] = body + + security = self.operation_security(operation, open_api_schema) + if security: + result["security"] = security + + if operation.openapi_extra: + self.deep_dict_update(result, operation.openapi_extra) + + return result + + def deep_dict_update( + self, main_dict: Dict[Any, Any], update_dict: Dict[Any, Any] + ) -> None: + for key in update_dict: + if ( + key in main_dict + and isinstance(main_dict[key], dict) + and isinstance(update_dict[key], dict) + ): + self.deep_dict_update( + main_dict[key], update_dict[key] + ) # pragma: no cover + elif ( + key in main_dict + and isinstance(main_dict[key], list) + and isinstance(update_dict[key], list) + ): + main_dict[key].extend(update_dict[key]) + else: + main_dict[key] = update_dict[key] + + + def operation_parameters(self, operation: Operation, open_api_schema) -> List[DictStrAny]: + result = [] + for model in operation.models: + if model.__ninja_param_source__ not in self.body_content_types: + result.extend(open_api_schema.param_extractor._extract_parameters(model)) + return result + + def operation_security(self, operation: Operation, open_api_schema) -> Optional[List[DictStrAny]]: + if not operation.auth_callbacks: + return None + result = [] + for auth in operation.auth_callbacks: + if hasattr(auth, "openapi_security_schema"): + scopes: List[DictStrAny] = [] # TODO: scopes + name = auth.__class__.__name__ + result.append({name: scopes}) # TODO: check if unique + open_api_schema.securitySchemes[name] = auth.openapi_security_schema + return result \ No newline at end of file diff --git a/ninja/openapi/param_extractor.py b/ninja/openapi/param_extractor.py new file mode 100644 index 000000000..0f546db23 --- /dev/null +++ b/ninja/openapi/param_extractor.py @@ -0,0 +1,115 @@ +from typing import Generator, List, Tuple +from ninja.params.models import TModel +from ninja.schema import NinjaGenerateJsonSchema +from ninja.types import DictStrAny + + +class ParameterExtractor: + def __init__(self, ref_template, open_api_schema): + self.ref_template = ref_template + self.open_api_schema = open_api_schema + + + def _extract_parameters(self, model: TModel) -> List[DictStrAny]: + result = [] + + schema = model.model_json_schema( + ref_template=self.ref_template, + schema_generator=NinjaGenerateJsonSchema, + ) + + required = set(schema.get("required", [])) + properties = schema["properties"] + + if "$defs" in schema: + self.add_schema_definitions(schema["$defs"]) + + for name, details in properties.items(): + is_required = name in required + p_name: str + p_schema: DictStrAny + p_required: bool + for p_name, p_schema, p_required in self.flatten_properties( + name, details, is_required, schema.get("$defs", {}) + ): + if not p_schema.get("include_in_schema", True): + continue + + param = { + "in": model.__ninja_param_source__, + "name": p_name, + "schema": p_schema, + "required": p_required, + } + + # copy description from schema description to param description + if "description" in p_schema: + param["description"] = p_schema["description"] + if "examples" in p_schema: + param["examples"] = p_schema["examples"] + elif "example" in p_schema: + param["example"] = p_schema["example"] + if "deprecated" in p_schema: + param["deprecated"] = p_schema["deprecated"] + + result.append(param) + + return result + + def flatten_properties( + self, + prop_name: str, + prop_details: DictStrAny, + prop_required: bool, + definitions: DictStrAny, + ) -> Generator[Tuple[str, DictStrAny, bool], None, None]: + """ + extracts all nested model's properties into flat properties + (used f.e. in GET params with multiple arguments and models) + """ + if "allOf" in prop_details: + resolve_allOf(prop_details, definitions) + if len(prop_details["allOf"]) == 1 and "enum" in prop_details["allOf"][0]: + # is_required = "default" not in prop_details + yield prop_name, prop_details, prop_required + else: # pragma: no cover + # TODO: this code was for pydanitc 1.7+ ... <2.9 - check if this is still needed + for item in prop_details["allOf"]: + yield from self.flatten_properties("", item, True, definitions) + + elif "items" in prop_details and "$ref" in prop_details["items"]: + def_name = prop_details["items"]["$ref"].rsplit("/", 1)[-1] + prop_details["items"].update(definitions[def_name]) + del prop_details["items"]["$ref"] # seems num data is there so ref not needed + yield prop_name, prop_details, prop_required + + elif "$ref" in prop_details: + def_name = prop_details["$ref"].split("/")[-1] + definition = definitions[def_name] + yield from self.flatten_properties(prop_name, definition, prop_required, definitions) + + elif "properties" in prop_details: + required = set(prop_details.get("required", [])) + for k, v in prop_details["properties"].items(): + is_required = k in required + yield from self.flatten_properties(k, v, is_required, definitions) + else: + yield prop_name, prop_details, prop_required + + def add_schema_definitions(self, definitions: dict) -> None: + # TODO: check if schema["definitions"] are unique + # if not - workaround (maybe use pydantic.schema.schema(models)) to process list of models + # assert set(definitions.keys()) - set(self.schemas.keys()) == set() + # ::TODO:: this is broken in interesting ways for by_alias, + # because same schema (name) can have different values + self.open_api_schema.add_schema_definitions(definitions) + +def resolve_allOf(details: DictStrAny, definitions: DictStrAny) -> None: + """ + resolves all $ref's in 'allOf' section + """ + for item in details["allOf"]: + if "$ref" in item: + def_name = item["$ref"].rsplit("/", 1)[-1] + item.update(definitions[def_name]) + del item["$ref"] diff --git a/ninja/openapi/request_body_builder.py b/ninja/openapi/request_body_builder.py new file mode 100644 index 000000000..5406643b8 --- /dev/null +++ b/ninja/openapi/request_body_builder.py @@ -0,0 +1,66 @@ +import itertools +from typing import List, Tuple +from ninja.openapi.schema_builder import SchemaBuilder +from ninja.operation import Operation +from ninja.params.models import TModels +from ninja.types import DictStrAny + + +class RequestBodyBuilder: + def __init__(self, body_content_types, ref_template): + self.body_content_types = body_content_types + self.ref_template = ref_template + + def request_body(self, operation: Operation, open_api_schema) -> DictStrAny: + models = [ + m + for m in operation.models + if m.__ninja_param_source__ in self.body_content_types + ] + if not models: + return {} + + if len(models) == 1: + model = models[0] + content_type = self.body_content_types[model.__ninja_param_source__] + schema, required = open_api_schema.schema_builder._create_schema_from_model( + model, remove_level=model.__ninja_param_source__ == "body" + ) + else: + schema, content_type = self._create_multipart_schema_from_models(models, open_api_schema) + required = True + + return { + "content": {content_type: {"schema": schema}}, + "required": required, + } + + def _create_multipart_schema_from_models( + self, models: TModels, open_api_schema + ) -> Tuple[DictStrAny, str]: + # We have File and Form or Body, so we need to use multipart (File) + content_type = self.body_content_types["file"] + + # get the various schemas + result = self.merge_schemas([ + open_api_schema.schema_builder._create_schema_from_model(model, remove_level=False)[0] + for model in models + ]) + result["title"] = "MultiPartBodyParams" + + return result, content_type + + def merge_schemas(self, schemas: List[DictStrAny]) -> DictStrAny: + result = schemas[0] + for scm in schemas[1:]: + result["properties"].update(scm["properties"]) + + required_list = result.get("required", []) + required_list.extend( + itertools.chain.from_iterable( + schema.get("required", ()) for schema in schemas[1:] + ) + ) + if required_list: + result["required"] = required_list + return result \ No newline at end of file diff --git a/ninja/openapi/response_builder.py b/ninja/openapi/response_builder.py new file mode 100644 index 000000000..b96b3f4bc --- /dev/null +++ b/ninja/openapi/response_builder.py @@ -0,0 +1,28 @@ +from http.client import responses +from typing import Any, Dict +from ninja.constants import NOT_SET +from ninja.operation import Operation + + +class ResponseBuilder: + def build(self, operation: Operation, open_api_schema): + assert bool(operation.response_models), f"{operation.response_models} empty" + + result = {} + for status, model in operation.response_models.items(): + if status == Ellipsis: + continue # it's not yet clear what it means if user wants to output any other code + + description = responses.get(status, "Unknown Status Code") + details: Dict[int, Any] = {status: {"description": description}} + if model not in [None, NOT_SET]: + # ::TODO:: test this: by_alias == True + schema = open_api_schema.schema_builder._create_schema_from_model( + model, by_alias=operation.by_alias, mode="serialization" + )[0] + details[status]["content"] = { + open_api_schema.api.renderer.media_type: {"schema": schema} + } + result.update(details) + + return result \ No newline at end of file diff --git a/ninja/openapi/schema.py b/ninja/openapi/schema.py index 93a64e981..cf0c76966 100644 --- a/ninja/openapi/schema.py +++ b/ninja/openapi/schema.py @@ -1,15 +1,13 @@ -import itertools import re -from http.client import responses -from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Dict, Set -from django.utils.termcolors import make_style -from pydantic.json_schema import JsonSchemaMode -from ninja.constants import NOT_SET -from ninja.operation import Operation -from ninja.params.models import TModel, TModels -from ninja.schema import NinjaGenerateJsonSchema +from ninja.openapi.operation_details_builder import OperationDetailBuilder +from ninja.openapi.param_extractor import ParameterExtractor +from ninja.openapi.request_body_builder import RequestBodyBuilder +from ninja.openapi.response_builder import ResponseBuilder +from ninja.openapi.schema_builder import SchemaBuilder +from ninja.params.models import TModel from ninja.types import DictStrAny from ninja.utils import normalize_path @@ -29,10 +27,6 @@ def get_schema(api: "NinjaAPI", path_prefix: str = "") -> "OpenAPISchema": openapi = OpenAPISchema(api, path_prefix) return openapi - -bold_red_style = make_style(opts=("bold",), fg="red") - - class OpenAPISchema(dict): def __init__(self, api: "NinjaAPI", path_prefix: str) -> None: self.api = api @@ -40,6 +34,12 @@ def __init__(self, api: "NinjaAPI", path_prefix: str) -> None: self.schemas: DictStrAny = {} self.securitySchemes: DictStrAny = {} self.all_operation_ids: Set = set() + self.schema_builder = SchemaBuilder(REF_TEMPLATE, self) + self.param_extractor = ParameterExtractor(REF_TEMPLATE, self) + self.request_body_builder = RequestBodyBuilder(BODY_CONTENT_TYPES, REF_TEMPLATE) + self.response_builder = ResponseBuilder() + self.operation_details_builder = OperationDetailBuilder(BODY_CONTENT_TYPES) + extra_info = api.openapi_extra.get("info", {}) super().__init__([ ("openapi", "3.1.0"), @@ -83,299 +83,20 @@ def methods(self, operations: list) -> DictStrAny: result = {} for op in operations: if op.include_in_schema: - operation_details = self.operation_details(op) + operation_details = self.operation_details_builder.build(op, self) for method in op.methods: result[method.lower()] = operation_details return result - def deep_dict_update( - self, main_dict: Dict[Any, Any], update_dict: Dict[Any, Any] - ) -> None: - for key in update_dict: - if ( - key in main_dict - and isinstance(main_dict[key], dict) - and isinstance(update_dict[key], dict) - ): - self.deep_dict_update( - main_dict[key], update_dict[key] - ) # pragma: no cover - elif ( - key in main_dict - and isinstance(main_dict[key], list) - and isinstance(update_dict[key], list) - ): - main_dict[key].extend(update_dict[key]) - else: - main_dict[key] = update_dict[key] - - def operation_details(self, operation: Operation) -> DictStrAny: - op_id = operation.operation_id or self.api.get_openapi_operation_id(operation) - if op_id in self.all_operation_ids: - print( - bold_red_style( - f'Warning: operation_id "{op_id}" is already used (Try giving a different name to: {operation.view_func.__module__}.{operation.view_func.__name__})' - ) - ) - self.all_operation_ids.add(op_id) - result = { - "operationId": op_id, - "summary": operation.summary, - "parameters": self.operation_parameters(operation), - "responses": self.responses(operation), - } - - if operation.description: - result["description"] = operation.description - - if operation.tags: - result["tags"] = operation.tags - - if operation.deprecated: - result["deprecated"] = operation.deprecated # type: ignore - - body = self.request_body(operation) - if body: - result["requestBody"] = body - - security = self.operation_security(operation) - if security: - result["security"] = security - - if operation.openapi_extra: - self.deep_dict_update(result, operation.openapi_extra) - - return result - - def operation_parameters(self, operation: Operation) -> List[DictStrAny]: - result = [] - for model in operation.models: - if model.__ninja_param_source__ not in BODY_CONTENT_TYPES: - result.extend(self._extract_parameters(model)) - return result - - def _extract_parameters(self, model: TModel) -> List[DictStrAny]: - result = [] - - schema = model.model_json_schema( - ref_template=REF_TEMPLATE, - schema_generator=NinjaGenerateJsonSchema, - ) - - required = set(schema.get("required", [])) - properties = schema["properties"] - - if "$defs" in schema: - self.add_schema_definitions(schema["$defs"]) - - for name, details in properties.items(): - is_required = name in required - p_name: str - p_schema: DictStrAny - p_required: bool - for p_name, p_schema, p_required in flatten_properties( - name, details, is_required, schema.get("$defs", {}) - ): - if not p_schema.get("include_in_schema", True): - continue - - param = { - "in": model.__ninja_param_source__, - "name": p_name, - "schema": p_schema, - "required": p_required, - } - - # copy description from schema description to param description - if "description" in p_schema: - param["description"] = p_schema["description"] - if "examples" in p_schema: - param["examples"] = p_schema["examples"] - elif "example" in p_schema: - param["example"] = p_schema["example"] - if "deprecated" in p_schema: - param["deprecated"] = p_schema["deprecated"] - - result.append(param) - - return result - - def _flatten_schema(self, model: TModel) -> DictStrAny: - params = self._extract_parameters(model) - flattened = { - "title": model.__name__, # type: ignore - "type": "object", - "properties": {p["name"]: p["schema"] for p in params}, - } - required = [p["name"] for p in params if p["required"]] - if required: - flattened["required"] = required - return flattened - - def _create_schema_from_model( - self, - model: TModel, - by_alias: bool = True, - remove_level: bool = True, - mode: JsonSchemaMode = "validation", - ) -> Tuple[DictStrAny, bool]: - if hasattr(model, "__ninja_flatten_map__"): - schema = self._flatten_schema(model) - else: - schema = model.model_json_schema( - ref_template=REF_TEMPLATE, - by_alias=by_alias, - schema_generator=NinjaGenerateJsonSchema, - mode=mode, - ).copy() - - # move Schemas from definitions - if schema.get("$defs"): - self.add_schema_definitions(schema.pop("$defs")) - - if remove_level and len(schema["properties"]) == 1: - name, details = list(schema["properties"].items())[0] - - # ref = details["$ref"] - required = name in schema.get("required", {}) - return details, required - else: - return schema, True - - def _create_multipart_schema_from_models( - self, - models: TModels, - mode: JsonSchemaMode = "validation", - ) -> Tuple[DictStrAny, str]: - # We have File and Form or Body, so we need to use multipart (File) - content_type = BODY_CONTENT_TYPES["file"] - - # get the various schemas - result = merge_schemas([ - self._create_schema_from_model(model, remove_level=False)[0] - for model in models - ]) - result["title"] = "MultiPartBodyParams" - - return result, content_type - - def request_body(self, operation: Operation) -> DictStrAny: - models = [ - m - for m in operation.models - if m.__ninja_param_source__ in BODY_CONTENT_TYPES - ] - if not models: - return {} - - if len(models) == 1: - model = models[0] - content_type = BODY_CONTENT_TYPES[model.__ninja_param_source__] - schema, required = self._create_schema_from_model( - model, - remove_level=model.__ninja_param_source__ == "body", - mode="validation", - ) - else: - schema, content_type = self._create_multipart_schema_from_models( - models, mode="validation" - ) - required = True - - return { - "content": {content_type: {"schema": schema}}, - "required": required, - } - - def responses(self, operation: Operation) -> Dict[int, DictStrAny]: - assert bool(operation.response_models), f"{operation.response_models} empty" - - result = {} - for status, model in operation.response_models.items(): - if status == Ellipsis: - continue # it's not yet clear what it means if user wants to output any other code - - description = responses.get(status, "Unknown Status Code") - details: Dict[int, Any] = {status: {"description": description}} - if model not in [None, NOT_SET]: - # ::TODO:: test this: by_alias == True - schema = self._create_schema_from_model( - model, by_alias=operation.by_alias, mode="serialization" - )[0] - details[status]["content"] = { - self.api.renderer.media_type: {"schema": schema} - } - result.update(details) - - return result - - def operation_security(self, operation: Operation) -> Optional[List[DictStrAny]]: - if not operation.auth_callbacks: - return None - result = [] - for auth in operation.auth_callbacks: - if hasattr(auth, "openapi_security_schema"): - scopes: List[DictStrAny] = [] # TODO: scopes - name = auth.__class__.__name__ - result.append({name: scopes}) # TODO: check if unique - self.securitySchemes[name] = auth.openapi_security_schema - return result - def get_components(self) -> DictStrAny: result = {"schemas": self.schemas} if self.securitySchemes: result["securitySchemes"] = self.securitySchemes return result - + def add_schema_definitions(self, definitions: dict) -> None: - # TODO: check if schema["definitions"] are unique - # if not - workaround (maybe use pydantic.schema.schema(models)) to process list of models - # assert set(definitions.keys()) - set(self.schemas.keys()) == set() - # ::TODO:: this is broken in interesting ways for by_alias, - # because same schema (name) can have different values self.schemas.update(definitions) - -def flatten_properties( - prop_name: str, - prop_details: DictStrAny, - prop_required: bool, - definitions: DictStrAny, -) -> Generator[Tuple[str, DictStrAny, bool], None, None]: - """ - extracts all nested model's properties into flat properties - (used f.e. in GET params with multiple arguments and models) - """ - if "allOf" in prop_details: - resolve_allOf(prop_details, definitions) - if len(prop_details["allOf"]) == 1 and "enum" in prop_details["allOf"][0]: - # is_required = "default" not in prop_details - yield prop_name, prop_details, prop_required - else: # pragma: no cover - # TODO: this code was for pydanitc 1.7+ ... <2.9 - check if this is still needed - for item in prop_details["allOf"]: - yield from flatten_properties("", item, True, definitions) - - elif "items" in prop_details and "$ref" in prop_details["items"]: - def_name = prop_details["items"]["$ref"].rsplit("/", 1)[-1] - prop_details["items"].update(definitions[def_name]) - del prop_details["items"]["$ref"] # seems num data is there so ref not needed - yield prop_name, prop_details, prop_required - - elif "$ref" in prop_details: - def_name = prop_details["$ref"].split("/")[-1] - definition = definitions[def_name] - yield from flatten_properties(prop_name, definition, prop_required, definitions) - - elif "properties" in prop_details: - required = set(prop_details.get("required", [])) - for k, v in prop_details["properties"].items(): - is_required = k in required - yield from flatten_properties(k, v, is_required, definitions) - else: - yield prop_name, prop_details, prop_required - - def resolve_allOf(details: DictStrAny, definitions: DictStrAny) -> None: """ resolves all $ref's in 'allOf' section @@ -384,20 +105,4 @@ def resolve_allOf(details: DictStrAny, definitions: DictStrAny) -> None: if "$ref" in item: def_name = item["$ref"].rsplit("/", 1)[-1] item.update(definitions[def_name]) - del item["$ref"] - - -def merge_schemas(schemas: List[DictStrAny]) -> DictStrAny: - result = schemas[0] - for scm in schemas[1:]: - result["properties"].update(scm["properties"]) - - required_list = result.get("required", []) - required_list.extend( - itertools.chain.from_iterable( - schema.get("required", ()) for schema in schemas[1:] - ) - ) - if required_list: - result["required"] = required_list - return result + del item["$ref"] \ No newline at end of file diff --git a/ninja/openapi/schema_builder.py b/ninja/openapi/schema_builder.py new file mode 100644 index 000000000..cf9e89920 --- /dev/null +++ b/ninja/openapi/schema_builder.py @@ -0,0 +1,52 @@ +from typing import Tuple +from ninja.params.models import TModel +from ninja.schema import NinjaGenerateJsonSchema +from ninja.types import DictStrAny +from pydantic.json_schema import JsonSchemaMode + +class SchemaBuilder: + def __init__(self, ref_template, open_api_schema): + self.ref_template = ref_template + self.open_api_schema = open_api_schema + + def _create_schema_from_model( + self, + model: TModel, + by_alias: bool = True, + remove_level: bool = True, + mode: JsonSchemaMode = "validation", + ) -> Tuple[DictStrAny, bool]: + if hasattr(model, "__ninja_flatten_map__"): + schema = self._flatten_schema(model) + else: + schema = model.model_json_schema( + ref_template=self.ref_template, + by_alias=by_alias, + schema_generator=NinjaGenerateJsonSchema, + mode=mode, + ).copy() + + # move Schemas from definitions + if schema.get("$defs"): + self.open_api_schema.add_schema_definitions(schema.pop("$defs")) + + if remove_level and len(schema["properties"]) == 1: + name, details = list(schema["properties"].items())[0] + + # ref = details["$ref"] + required = name in schema.get("required", {}) + return details, required + else: + return schema, True + + def _flatten_schema(self, model: TModel) -> DictStrAny: + params = self.open_api_schema.param_extractor._extract_parameters(model) + flattened = { + "title": model.__name__, # type: ignore + "type": "object", + "properties": {p["name"]: p["schema"] for p in params}, + } + required = [p["name"] for p in params if p["required"]] + if required: + flattened["required"] = required + return flattened \ No newline at end of file diff --git a/ninja/orm/fields.py b/ninja/orm/fields.py index d67814c8c..3d5c2c59e 100644 --- a/ninja/orm/fields.py +++ b/ninja/orm/fields.py @@ -118,66 +118,28 @@ def get_schema_field( field: DjangoField, *, depth: int = 0, optional: bool = False ) -> Tuple: "Returns pydantic field from django's model field" + alias = None - default = ... - default_factory = None - description = None - title = None max_length = None - nullable = False - python_type = None if field.is_relation: - if depth > 0: - return get_related_field_schema(field, depth=depth) - - internal_type = field.related_model._meta.pk.get_internal_type() + result = _handle_relation_field(field, depth=depth, optional=optional) - if not field.concrete and field.auto_created or field.null or optional: - default = None - nullable = True - - alias = getattr(field, "get_attname", None) and field.get_attname() - - pk_type = TYPES.get(internal_type, int) - if field.one_to_many or field.many_to_many: - m2m_type = create_m2m_link_type(pk_type) - python_type = List[m2m_type] # type: ignore + if isinstance(result, tuple) and len(result) == 5: + python_type, default, default_factory, nullable, alias = result else: - python_type = pk_type + return result else: - _f_name, _f_path, _f_pos, field_options = field.deconstruct() - blank = field_options.get("blank", False) - null = field_options.get("null", False) - max_length = field_options.get("max_length") - - internal_type = field.get_internal_type() - try: - python_type = TYPES[internal_type] - except KeyError as e: - msg = [ - f"Do not know how to convert django field '{internal_type}'.", - "Try from ninja.orm import register_field", - f"register_field('{internal_type}', )", - ] - raise ConfigError("\n".join(msg)) from e - - if field.primary_key or blank or null or optional: - default = None - nullable = True - - if field.has_default(): - if callable(field.default): - default_factory = field.default - else: - default = field.default + python_type, default, default_factory, nullable, max_length = _handle_normal_field( + field, optional=optional + ) if default_factory: default = PydanticUndefined if nullable: - python_type = Union[python_type, None] # aka Optional in 3.7+ + python_type = Union[python_type, None] description = field.help_text or None title = title_if_lower(field.verbose_name) @@ -197,6 +159,78 @@ def get_schema_field( ) +def _get_python_type(internal_type: str): + try: + return TYPES[internal_type] + except KeyError as e: + msg = [ + f"Do not know how to convert django field '{internal_type}'.", + "Try from ninja.orm import register_field", + f"register_field('{internal_type}', )", + ] + raise ConfigError("\n".join(msg)) from e + + +def _should_be_nullable(*flags: bool) -> bool: + return any(flags) + + +def _handle_relation_field(field, *, depth: int, optional: bool): + if depth > 0: + return get_related_field_schema(field, depth=depth) + + default = ... + default_factory = None + nullable = False + alias = None + + internal_type = field.related_model._meta.pk.get_internal_type() + + if not field.concrete and field.auto_created or field.null or optional: + default = None + nullable = True + + alias = getattr(field, "get_attname", None) and field.get_attname() + + pk_type = _get_python_type(internal_type) + + if field.one_to_many or field.many_to_many: + m2m_type = create_m2m_link_type(pk_type) + python_type = List[m2m_type] # type: ignore + else: + python_type = pk_type + + return python_type, default, default_factory, nullable, alias + + +def _handle_normal_field(field, *, optional: bool): + default = ... + default_factory = None + nullable = False + + _f_name, _f_path, _f_pos, field_options = field.deconstruct() + + blank = field_options.get("blank", False) + null = field_options.get("null", False) + max_length = field_options.get("max_length") + + internal_type = field.get_internal_type() + python_type = _get_python_type(internal_type) + + if _should_be_nullable(field.primary_key, blank, null, optional): + default = None + nullable = True + + if field.has_default(): + if callable(field.default): + default_factory = field.default + else: + default = field.default + + return python_type, default, default_factory, nullable, max_length + + + @no_type_check def get_related_field_schema(field: DjangoField, *, depth: int) -> Tuple[OpenAPISchema]: from ninja.orm import create_schema