|
1 | 1 | import os |
| 2 | +import re |
2 | 3 | from typing import List, Dict, Optional, TypeVar, Callable, Iterable, Any |
3 | 4 |
|
4 | 5 | from langchain.schema import Document |
@@ -87,32 +88,63 @@ def _get_column_info( |
87 | 88 | return column_info |
88 | 89 |
|
89 | 90 |
|
| 91 | +def _extract_dataset_name_from_urn(urn: str) -> Optional[str]: |
| 92 | + """URN 문자열에서 데이터셋 이름(예: delta.default.stg_gh_events)만 추출. |
| 93 | +
|
| 94 | + 지원 패턴: |
| 95 | + - dataset URN: urn:li:dataset:(urn:li:dataPlatform:dbt,delta.default.stg_gh_events,PROD) |
| 96 | + - schemaField URN: urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,delta.default.stg_gh_events,PROD),event_id) |
| 97 | + """ |
| 98 | + match = re.search( |
| 99 | + r"urn:li:dataset:\(urn:li:dataPlatform:[^,]+,([^,]+),[^)]+\)", urn |
| 100 | + ) |
| 101 | + if match: |
| 102 | + return match.group(1) |
| 103 | + return None |
| 104 | + |
| 105 | + |
90 | 106 | def get_info_from_db(max_workers: int = 8) -> List[Document]: |
91 | 107 | table_info = _get_table_info(max_workers=max_workers) |
92 | 108 |
|
93 | 109 | fetcher = _get_fetcher() |
94 | 110 | urns = list(fetcher.get_urns()) |
95 | 111 | urn_table_mapping = {} |
| 112 | + display_name_by_table = {} |
96 | 113 | for urn in urns: |
97 | | - table_name = fetcher.get_table_name(urn) |
98 | | - if table_name: |
99 | | - urn_table_mapping[table_name] = urn |
100 | | - |
101 | | - def process_table_info(item: tuple[str, str]) -> str: |
102 | | - table_name, table_description = item |
| 114 | + original_name = fetcher.get_table_name(urn) |
| 115 | + if original_name: |
| 116 | + urn_table_mapping[original_name] = urn |
| 117 | + parsed_name = _extract_dataset_name_from_urn(urn) |
| 118 | + if parsed_name: |
| 119 | + display_name_by_table[original_name] = parsed_name |
| 120 | + |
| 121 | + def process_table_info(item: tuple[str, str, str]) -> str: |
| 122 | + original_table_name, table_description, display_table_name = item |
| 123 | + # 컬럼 조회는 기존 테이블 이름으로 수행 (urn_table_mapping과 일치) |
103 | 124 | column_info = _get_column_info( |
104 | | - table_name, urn_table_mapping, max_workers=max_workers |
| 125 | + original_table_name, urn_table_mapping, max_workers=max_workers |
105 | 126 | ) |
106 | 127 | column_info_str = "\n".join( |
107 | 128 | [ |
108 | 129 | f"{col['column_name']}: {col['column_description']}" |
109 | 130 | for col in column_info |
110 | 131 | ] |
111 | 132 | ) |
112 | | - return f"{table_name}: {table_description}\nColumns:\n {column_info_str}" |
| 133 | + used_name = display_table_name or original_table_name |
| 134 | + return f"{used_name}: {table_description}\nColumns:\n {column_info_str}" |
| 135 | + |
| 136 | + # 표시용 이름을 세 번째 파라미터로 함께 전달 |
| 137 | + items_with_display = [ |
| 138 | + ( |
| 139 | + name, |
| 140 | + desc, |
| 141 | + display_name_by_table.get(name, name), |
| 142 | + ) |
| 143 | + for name, desc in table_info.items() |
| 144 | + ] |
113 | 145 |
|
114 | 146 | table_info_str_list = parallel_process( |
115 | | - table_info.items(), |
| 147 | + items_with_display, |
116 | 148 | process_table_info, |
117 | 149 | max_workers=max_workers, |
118 | 150 | desc="컬럼 정보 수집 중", |
|
0 commit comments