|
2 | 2 | import filecmp |
3 | 3 | import json |
4 | 4 | import os |
5 | | -import re |
6 | 5 | import shlex |
7 | 6 | import shutil |
8 | 7 | import sys |
9 | 8 | import textwrap |
10 | 9 | import time |
11 | 10 | from concurrent.futures import Future, ThreadPoolExecutor, as_completed |
| 11 | +from io import StringIO |
12 | 12 | from pathlib import Path |
13 | 13 | from tempfile import gettempdir |
14 | 14 | from typing import Any |
|
18 | 18 | import requests |
19 | 19 | from cloup.constraints import If, IsSet, accept_none, require_one |
20 | 20 | from jinja2 import DebugUndefined, Environment, FileSystemLoader, meta |
| 21 | +from kubernetes.dynamic import DynamicClient |
21 | 22 | from packaging.version import Version |
22 | 23 | from pyhelper_utils.shell import run_command |
23 | 24 | from rich.console import Console |
| 25 | +from rich.panel import Panel |
24 | 26 | from rich.syntax import Syntax |
| 27 | +from rich.table import Table |
25 | 28 | from simple_logger.logger import get_logger |
26 | 29 |
|
27 | 30 | from ocp_resources.resource import Resource, get_client |
28 | | -from kubernetes.dynamic import DynamicClient |
| 31 | +from ocp_resources.utils.schema_validator import SchemaValidator |
| 32 | +from ocp_resources.utils.utils import convert_camel_case_to_snake_case |
29 | 33 |
|
| 34 | +# Set global logging |
| 35 | +LOGGER = get_logger(name="class_generator") |
30 | 36 | SPEC_STR: str = "SPEC" |
31 | 37 | FIELDS_STR: str = "FIELDS" |
32 | | -LOGGER = get_logger(name="class_generator") |
33 | 38 | TESTS_MANIFESTS_DIR: str = "class_generator/tests/manifests" |
34 | 39 | SCHEMA_DIR: str = "class_generator/schema" |
35 | 40 | RESOURCES_MAPPING_FILE: str = os.path.join(SCHEMA_DIR, "__resources-mappings.json") |
@@ -419,11 +424,6 @@ def generate_class_generator_command(resource: dict[str, Any]) -> str: |
419 | 424 |
|
420 | 425 | else: |
421 | 426 | # Human-readable format with Rich |
422 | | - from rich.console import Console |
423 | | - from rich.table import Table |
424 | | - from rich.panel import Panel |
425 | | - from io import StringIO |
426 | | - |
427 | 427 | # Create string buffer to capture Rich output |
428 | 428 | string_buffer = StringIO() |
429 | 429 | console = Console(file=string_buffer, force_terminal=True, width=120) |
@@ -594,8 +594,17 @@ def map_kind_to_namespaced(client: str, newer_cluster_version: bool, schema_defi |
594 | 594 | with open(not_kind_file, "w") as fd: |
595 | 595 | fd.writelines("\n".join(not_kind_list)) |
596 | 596 |
|
| 597 | + # Clear SchemaValidator cache so it reloads the updated files |
| 598 | + SchemaValidator.clear_cache() |
| 599 | + |
597 | 600 |
|
598 | 601 | def read_resources_mapping_file() -> dict[Any, Any]: |
| 602 | + """Read resources mapping using SchemaValidator for consistency""" |
| 603 | + # Try to use SchemaValidator first |
| 604 | + if SchemaValidator.load_mappings_data(): |
| 605 | + return SchemaValidator._mappings_data or {} |
| 606 | + |
| 607 | + # Fallback for cases where schema files don't exist yet (e.g., initial generation) |
599 | 608 | try: |
600 | 609 | with open(RESOURCES_MAPPING_FILE) as fd: |
601 | 610 | return json.load(fd) |
@@ -701,112 +710,8 @@ def update_kind_schema(): |
701 | 710 | client=client, newer_cluster_version=same_or_newer_version, schema_definition_file=ocp_openapi_json_file |
702 | 711 | ) |
703 | 712 |
|
704 | | - # Copy the resources mapping file to fake_kubernetes_client for the fake client to use |
705 | | - fake_client_mappings = Path("fake_kubernetes_client/__resources-mappings.json") |
706 | | - try: |
707 | | - shutil.copy2(RESOURCES_MAPPING_FILE, fake_client_mappings) |
708 | | - except (OSError, IOError): |
709 | | - # Don't fail the entire process if copy fails |
710 | | - pass |
711 | | - |
712 | | - |
713 | | -def convert_camel_case_to_snake_case(name: str) -> str: |
714 | | - """ |
715 | | - Converts a camel case string to snake case. |
716 | | -
|
717 | | - Args: |
718 | | - name (str): The camel case string to convert. |
719 | | -
|
720 | | - Returns: |
721 | | - str: The snake case representation of the input string. |
722 | | -
|
723 | | - Examples: |
724 | | - >>> convert_camel_case_to_snake_case(string_="allocateLoadBalancerNodePorts") |
725 | | - 'allocate_load_balancer_node_ports' |
726 | | - >>> convert_camel_case_to_snake_case(string_="clusterIPs") |
727 | | - 'cluster_ips' |
728 | | - >>> convert_camel_case_to_snake_case(string_="additionalCORSAllowedOS") |
729 | | - 'additional_cors_allowed_os' |
730 | | -
|
731 | | - Notes: |
732 | | - - This function assumes that the input string adheres to camel case conventions. |
733 | | - - If the input string contains acronyms (e.g., "XMLHttpRequest"), they will be treated as separate words |
734 | | - (e.g., "xml_http_request"). |
735 | | - - The function handles both single-word camel case strings (e.g., "Service") and multi-word camel case strings |
736 | | - (e.g., "myCamelCaseString"). |
737 | | - """ |
738 | | - do_not_process_list = ["oauth", "kubevirt"] |
739 | | - |
740 | | - # If the input string is in the do_not_proccess_list, return it as it is. |
741 | | - if name.lower() in do_not_process_list: |
742 | | - return name.lower() |
743 | | - |
744 | | - formatted_str: str = "" |
745 | | - |
746 | | - if name.islower(): |
747 | | - return name |
748 | | - |
749 | | - # For single words, e.g "Service" or "SERVICE" |
750 | | - if name.istitle() or name.isupper(): |
751 | | - return name.lower() |
752 | | - |
753 | | - # To decide if underscore is needed before a char, keep the last char format. |
754 | | - # If previous char is uppercase, underscode should not be added. Also applied for the first char in the string. |
755 | | - last_capital_char: bool | None = None |
756 | | - |
757 | | - # To decide if there are additional words ahead; if found, there is at least one more word ahead, else this is the |
758 | | - # last word. Underscore should be added before it and all chars from here should be lowercase. |
759 | | - following_capital_chars: re.Match | None = None |
760 | | - |
761 | | - str_len_for_idx_check = len(name) - 1 |
762 | | - |
763 | | - for idx, char in enumerate(name): |
764 | | - # If lower case, append to formatted string |
765 | | - if char.islower(): |
766 | | - formatted_str += char |
767 | | - last_capital_char = False |
768 | | - |
769 | | - # If first char is uppercase |
770 | | - elif idx == 0: |
771 | | - formatted_str += char.lower() |
772 | | - last_capital_char = True |
773 | | - |
774 | | - else: |
775 | | - if idx < str_len_for_idx_check: |
776 | | - following_capital_chars = re.search(r"[A-Z]", "".join(name[idx + 1 :])) |
777 | | - if last_capital_char: |
778 | | - if idx < str_len_for_idx_check and name[idx + 1].islower(): |
779 | | - if following_capital_chars: |
780 | | - formatted_str += f"_{char.lower()}" |
781 | | - last_capital_char = True |
782 | | - continue |
783 | | - |
784 | | - remaining_str = "".join(name[idx:]) |
785 | | - # The 2 letters in the string; uppercase char followed by lowercase char. |
786 | | - # Example: `clusterIPs`, handle `Ps` at this point |
787 | | - if idx + 1 == str_len_for_idx_check: |
788 | | - formatted_str += remaining_str.lower() |
789 | | - break |
790 | | - |
791 | | - # The last word in the string; uppercase followed by multiple lowercase chars |
792 | | - # Example: `dataVolumeTTLSeconds`, handle `Seconds` at this point |
793 | | - elif remaining_str.istitle(): |
794 | | - formatted_str += f"_{remaining_str.lower()}" |
795 | | - break |
796 | | - |
797 | | - else: |
798 | | - formatted_str += char.lower() |
799 | | - last_capital_char = True |
800 | | - |
801 | | - else: |
802 | | - formatted_str += char.lower() |
803 | | - last_capital_char = True |
804 | | - |
805 | | - else: |
806 | | - formatted_str += f"_{char.lower()}" |
807 | | - last_capital_char = True |
808 | | - |
809 | | - return formatted_str |
| 713 | + # Clear SchemaValidator cache after updating schemas |
| 714 | + SchemaValidator.clear_cache() |
810 | 715 |
|
811 | 716 |
|
812 | 717 | def parse_user_code_from_file(file_path: str) -> tuple[str, str]: |
|
0 commit comments