diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 85560838..a88f707a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -84,6 +84,7 @@ repos: rev: v1.36.4 hooks: - id: djlint-reformat-jinja + exclude: ^src/gitingest/format/ - repo: https://github.com/igorshubovych/markdownlint-cli rev: v0.45.0 diff --git a/src/gitingest/entrypoint.py b/src/gitingest/entrypoint.py index f6b5c8c8..e3ecc9ee 100644 --- a/src/gitingest/entrypoint.py +++ b/src/gitingest/entrypoint.py @@ -15,6 +15,7 @@ from gitingest.clone import clone_repo from gitingest.config import MAX_FILE_SIZE from gitingest.ingestion import ingest_query +from gitingest.output_formatter import DefaultFormatter from gitingest.query_parser import parse_local_dir_path, parse_remote_repo from gitingest.utils.auth import resolve_token from gitingest.utils.compat_func import removesuffix @@ -44,12 +45,13 @@ async def ingest_async( include_submodules: bool = False, token: str | None = None, output: str | None = None, -) -> tuple[str, str, str]: +) -> str: """Ingest a source and process its contents. This function analyzes a source (URL or local path), clones the corresponding repository (if applicable), - and processes its files according to the specified query parameters. It returns a summary, a tree-like - structure of the files, and the content of the files. The results can optionally be written to an output file. + and processes its files according to the specified query parameters. It returns a single digest string. + + The output is generated lazily using a ContextV1 object and the DefaultFormatter class. Parameters ---------- @@ -79,11 +81,8 @@ async def ingest_async( Returns ------- - tuple[str, str, str] - A tuple containing: - - A summary string of the analyzed repository or directory. - - A tree-like string representation of the file structure. - - The content of the files in the repository or directory. + str + The full digest string. """ logger.info("Starting ingestion process", extra={"source": source}) @@ -138,14 +137,15 @@ async def ingest_async( _apply_gitignores(query) logger.info("Processing files and generating output") - summary, tree, content = ingest_query(query) if output: logger.debug("Writing output to file", extra={"output_path": output}) - await _write_output(tree, content=content, target=output) - + context = ingest_query(query) + formatter = DefaultFormatter() + digest = formatter.format(context, context.query) + await _write_output(digest, content=None, target=output) logger.info("Ingestion completed successfully") - return summary, tree, content + return digest def ingest( @@ -160,12 +160,13 @@ def ingest( include_submodules: bool = False, token: str | None = None, output: str | None = None, -) -> tuple[str, str, str]: +) -> str: """Provide a synchronous wrapper around ``ingest_async``. This function analyzes a source (URL or local path), clones the corresponding repository (if applicable), - and processes its files according to the specified query parameters. It returns a summary, a tree-like - structure of the files, and the content of the files. The results can optionally be written to an output file. + and processes its files according to the specified query parameters. It returns a single digest string. + + The output is generated lazily using a ContextV1 object and the DefaultFormatter class. Parameters ---------- @@ -195,11 +196,8 @@ def ingest( Returns ------- - tuple[str, str, str] - A tuple containing: - - A summary string of the analyzed repository or directory. - - A tree-like string representation of the file structure. - - The content of the files in the repository or directory. + str + The full digest string. See Also -------- @@ -208,7 +206,7 @@ def ingest( """ return asyncio.run( ingest_async( - source=source, + source, max_file_size=max_file_size, include_patterns=include_patterns, exclude_patterns=exclude_patterns, diff --git a/src/gitingest/format/DebugFormatter/Source.j2 b/src/gitingest/format/DebugFormatter/Source.j2 new file mode 100644 index 00000000..ecebd57b --- /dev/null +++ b/src/gitingest/format/DebugFormatter/Source.j2 @@ -0,0 +1,4 @@ +{{ SEPARATOR }} +DEBUG: {{ class_name }} +Fields: {{ fields_str }} +{{ SEPARATOR }} diff --git a/src/gitingest/format/DefaultFormatter/ContextV1.j2 b/src/gitingest/format/DefaultFormatter/ContextV1.j2 new file mode 100644 index 00000000..e9a211d7 --- /dev/null +++ b/src/gitingest/format/DefaultFormatter/ContextV1.j2 @@ -0,0 +1,11 @@ +# Generated using https://gitingest.com/{{ source.query.user_name }}/{{ source.query.repo_name }}{{ source.query.subpath }} + +Sources used: +{%- for src in source %} +- {{ src.name }}: {{ src.__class__.__name__ }} +{% endfor %} + +{%- for src in source.sources %} +{{ formatter.format(src, source.query) }} +{%- endfor %} +# End of https://gitingest.com/{{ source.query.user_name }}/{{ source.query.repo_name }}{{ source.query.subpath }} diff --git a/src/gitingest/format/DefaultFormatter/FileSystemDirectory.j2 b/src/gitingest/format/DefaultFormatter/FileSystemDirectory.j2 new file mode 100644 index 00000000..211ef932 --- /dev/null +++ b/src/gitingest/format/DefaultFormatter/FileSystemDirectory.j2 @@ -0,0 +1,7 @@ +{%- if source.depth == 0 %}{{ source.name }}: +{{ source.tree }} + +{% endif -%} +{%- for child in source.children -%} +{{ formatter.format(child, query) }} +{%- endfor -%} diff --git a/src/gitingest/format/DefaultFormatter/FileSystemFile.j2 b/src/gitingest/format/DefaultFormatter/FileSystemFile.j2 new file mode 100644 index 00000000..a62a4312 --- /dev/null +++ b/src/gitingest/format/DefaultFormatter/FileSystemFile.j2 @@ -0,0 +1,4 @@ +{{ SEPARATOR }} +{{ source.name }} +{{ SEPARATOR }} +{{ source.content }} diff --git a/src/gitingest/format/DefaultFormatter/FileSystemSymlink.j2 b/src/gitingest/format/DefaultFormatter/FileSystemSymlink.j2 new file mode 100644 index 00000000..b07ff641 --- /dev/null +++ b/src/gitingest/format/DefaultFormatter/FileSystemSymlink.j2 @@ -0,0 +1,3 @@ +{{ SEPARATOR }} +{{ source.name }}{% if source.target %} -> {{ source.target }}{% endif %} +{{ SEPARATOR }} diff --git a/src/gitingest/format/DefaultFormatter/GitRepository.j2 b/src/gitingest/format/DefaultFormatter/GitRepository.j2 new file mode 100644 index 00000000..d0cc8608 --- /dev/null +++ b/src/gitingest/format/DefaultFormatter/GitRepository.j2 @@ -0,0 +1,7 @@ +{%- if source.depth == 0 %}šŸ”— Git Repository: {{ source.name }} +{{ source.tree }} + +{% endif -%} +{%- for child in source.children -%} +{{ formatter.format(child, query) }} +{%- endfor -%} diff --git a/src/gitingest/format/SummaryFormatter/ContextV1.j2 b/src/gitingest/format/SummaryFormatter/ContextV1.j2 new file mode 100644 index 00000000..6d4fece7 --- /dev/null +++ b/src/gitingest/format/SummaryFormatter/ContextV1.j2 @@ -0,0 +1,5 @@ +Repository: {{ source.query.user_name }}/{{ source.query.repo_name }} +Commit: {{ source.query.commit }} +Files analyzed: {{ source.file_count }} + +Estimated tokens: {{ source.token_count }} diff --git a/src/gitingest/format/SummaryFormatter/FileSystemDirectory.j2 b/src/gitingest/format/SummaryFormatter/FileSystemDirectory.j2 new file mode 100644 index 00000000..cb4b6511 --- /dev/null +++ b/src/gitingest/format/SummaryFormatter/FileSystemDirectory.j2 @@ -0,0 +1,2 @@ +Directory structure: +{{ source.tree }} diff --git a/src/gitingest/ingestion.py b/src/gitingest/ingestion.py index 01a2c8f3..2afc7925 100644 --- a/src/gitingest/ingestion.py +++ b/src/gitingest/ingestion.py @@ -6,8 +6,8 @@ from typing import TYPE_CHECKING from gitingest.config import MAX_DIRECTORY_DEPTH, MAX_FILES, MAX_TOTAL_SIZE_BYTES -from gitingest.output_formatter import format_node -from gitingest.schemas import FileSystemNode, FileSystemNodeType, FileSystemStats +from gitingest.schemas import ContextV1, FileSystemNode, FileSystemStats +from gitingest.schemas.filesystem import FileSystemDirectory, FileSystemFile, FileSystemSymlink, GitRepository from gitingest.utils.ingestion_utils import _should_exclude, _should_include from gitingest.utils.logging_config import get_logger @@ -18,12 +18,18 @@ logger = get_logger(__name__) -def ingest_query(query: IngestionQuery) -> tuple[str, str, str]: +def _is_git_repository(path: Path) -> bool: + """Check if a directory contains a .git folder.""" + return (path / ".git").exists() + + +def ingest_query(query: IngestionQuery) -> ContextV1: """Run the ingestion process for a parsed query. - This is the main entry point for analyzing a codebase directory or single file. It processes the query - parameters, reads the file or directory content, and generates a summary, directory structure, and file content, - along with token estimations. + This is the main entry point for analyzing a codebase directory or single file. + + It processes the query parameters, reads the file or directory content, and returns + a ContextV1 object that can generate the final output digest on demand. Parameters ---------- @@ -32,8 +38,10 @@ def ingest_query(query: IngestionQuery) -> tuple[str, str, str]: Returns ------- - tuple[str, str, str] - A tuple containing the summary, directory structure, and file contents. + ContextV1 + A ContextV1 object representing the ingested file system nodes. + Use str(DefaultFormatter(context)) to get the summary, directory structure, + and file contents. Raises ------ @@ -70,11 +78,8 @@ def ingest_query(query: IngestionQuery) -> tuple[str, str, str]: relative_path = path.relative_to(query.local_path) - file_node = FileSystemNode( + file_node = FileSystemFile( name=path.name, - type=FileSystemNodeType.FILE, - size=path.stat().st_size, - file_count=1, path_str=str(relative_path), path=path, ) @@ -91,16 +96,21 @@ def ingest_query(query: IngestionQuery) -> tuple[str, str, str]: "file_size": file_node.size, }, ) - return format_node(file_node, query=query) - - logger.info("Processing directory", extra={"directory_path": str(path)}) + return ContextV1(sources=[file_node], query=query) - root_node = FileSystemNode( - name=path.name, - type=FileSystemNodeType.DIRECTORY, - path_str=str(path.relative_to(query.local_path)), - path=path, - ) + # Check if this is a git repository and create appropriate node type + if _is_git_repository(path): + root_node = GitRepository( + name=path.name, + path_str=str(path.relative_to(query.local_path)), + path=path, + ) + else: + root_node = FileSystemDirectory( + name=path.name, + path_str=str(path.relative_to(query.local_path)), + path=path, + ) stats = FileSystemStats() @@ -117,10 +127,10 @@ def ingest_query(query: IngestionQuery) -> tuple[str, str, str]: }, ) - return format_node(root_node, query=query) + return ContextV1(sources=[root_node], query=query) -def _process_node(node: FileSystemNode, query: IngestionQuery, stats: FileSystemStats) -> None: +def _process_node(node: FileSystemNode, query: IngestionQuery, stats: FileSystemStats) -> None: # noqa: C901 """Process a file or directory item within a directory. This function handles each file or directory item, checking if it should be included or excluded based on the @@ -161,13 +171,21 @@ def _process_node(node: FileSystemNode, query: IngestionQuery, stats: FileSystem continue _process_file(path=sub_path, parent_node=node, stats=stats, local_path=query.local_path) elif sub_path.is_dir(): - child_directory_node = FileSystemNode( - name=sub_path.name, - type=FileSystemNodeType.DIRECTORY, - path_str=str(sub_path.relative_to(query.local_path)), - path=sub_path, - depth=node.depth + 1, - ) + # Check if this subdirectory is a git repository + if _is_git_repository(sub_path): + child_directory_node = GitRepository( + name=sub_path.name, + path_str=str(sub_path.relative_to(query.local_path)), + path=sub_path, + depth=node.depth + 1, + ) + else: + child_directory_node = FileSystemDirectory( + name=sub_path.name, + path_str=str(sub_path.relative_to(query.local_path)), + path=sub_path, + depth=node.depth + 1, + ) _process_node(node=child_directory_node, query=query, stats=stats) @@ -201,9 +219,8 @@ def _process_symlink(path: Path, parent_node: FileSystemNode, stats: FileSystemS The base path of the repository or directory being processed. """ - child = FileSystemNode( + child = FileSystemSymlink( name=path.name, - type=FileSystemNodeType.SYMLINK, path_str=str(path.relative_to(local_path)), path=path, depth=parent_node.depth + 1, @@ -213,7 +230,7 @@ def _process_symlink(path: Path, parent_node: FileSystemNode, stats: FileSystemS parent_node.file_count += 1 -def _process_file(path: Path, parent_node: FileSystemNode, stats: FileSystemStats, local_path: Path) -> None: +def _process_file(path: Path, parent_node: FileSystemDirectory, stats: FileSystemStats, local_path: Path) -> None: """Process a file in the file system. This function checks the file's size, increments the statistics, and reads its content. @@ -223,7 +240,7 @@ def _process_file(path: Path, parent_node: FileSystemNode, stats: FileSystemStat ---------- path : Path The full path of the file. - parent_node : FileSystemNode + parent_node : FileSystemDirectory The dictionary to accumulate the results. stats : FileSystemStats Statistics tracking object for the total file count and size. @@ -258,11 +275,8 @@ def _process_file(path: Path, parent_node: FileSystemNode, stats: FileSystemStat stats.total_files += 1 stats.total_size += file_size - child = FileSystemNode( + child = FileSystemFile( name=path.name, - type=FileSystemNodeType.FILE, - size=file_size, - file_count=1, path_str=str(path.relative_to(local_path)), path=path, depth=parent_node.depth + 1, diff --git a/src/gitingest/output_formatter.py b/src/gitingest/output_formatter.py index 5c2b59ae..a7bb726a 100644 --- a/src/gitingest/output_formatter.py +++ b/src/gitingest/output_formatter.py @@ -3,12 +3,16 @@ from __future__ import annotations import ssl +from functools import singledispatchmethod +from pathlib import Path from typing import TYPE_CHECKING import requests.exceptions import tiktoken +from jinja2 import Environment, FileSystemLoader, Template, TemplateNotFound -from gitingest.schemas import FileSystemNode, FileSystemNodeType +from gitingest.schemas import FileSystemNode, Source +from gitingest.schemas.filesystem import SEPARATOR, FileSystemNodeType from gitingest.utils.compat_func import readlink from gitingest.utils.logging_config import get_logger @@ -24,84 +28,6 @@ ] -def format_node(node: FileSystemNode, query: IngestionQuery) -> tuple[str, str, str]: - """Generate a summary, directory structure, and file contents for a given file system node. - - If the node represents a directory, the function will recursively process its contents. - - Parameters - ---------- - node : FileSystemNode - The file system node to be summarized. - query : IngestionQuery - The parsed query object containing information about the repository and query parameters. - - Returns - ------- - tuple[str, str, str] - A tuple containing the summary, directory structure, and file contents. - - """ - is_single_file = node.type == FileSystemNodeType.FILE - summary = _create_summary_prefix(query, single_file=is_single_file) - - if node.type == FileSystemNodeType.DIRECTORY: - summary += f"Files analyzed: {node.file_count}\n" - elif node.type == FileSystemNodeType.FILE: - summary += f"File: {node.name}\n" - summary += f"Lines: {len(node.content.splitlines()):,}\n" - - tree = "Directory structure:\n" + _create_tree_structure(query, node=node) - - content = _gather_file_contents(node) - - token_estimate = _format_token_count(tree + content) - if token_estimate: - summary += f"\nEstimated tokens: {token_estimate}" - - return summary, tree, content - - -def _create_summary_prefix(query: IngestionQuery, *, single_file: bool = False) -> str: - """Create a prefix string for summarizing a repository or local directory. - - Includes repository name (if provided), commit/branch details, and subpath if relevant. - - Parameters - ---------- - query : IngestionQuery - The parsed query object containing information about the repository and query parameters. - single_file : bool - A flag indicating whether the summary is for a single file (default: ``False``). - - Returns - ------- - str - A summary prefix string containing repository, commit, branch, and subpath details. - - """ - parts = [] - - if query.user_name: - parts.append(f"Repository: {query.user_name}/{query.repo_name}") - else: - # Local scenario - parts.append(f"Directory: {query.slug}") - - if query.tag: - parts.append(f"Tag: {query.tag}") - elif query.branch and query.branch not in ("main", "master"): - parts.append(f"Branch: {query.branch}") - - if query.commit: - parts.append(f"Commit: {query.commit}") - - if query.subpath != "/" and not single_file: - parts.append(f"Subpath: {query.subpath}") - - return "\n".join(parts) + "\n" - - def _gather_file_contents(node: FileSystemNode) -> str: """Recursively gather contents of all files under the given node. @@ -208,3 +134,198 @@ def _format_token_count(text: str) -> str | None: return f"{total_tokens / threshold:.1f}{suffix}" return str(total_tokens) + + +def generate_digest(context: Source) -> str: + """Generate a digest string from a Source object. + + This is a convenience function that uses the DefaultFormatter to format a Source. + + Parameters + ---------- + context : Source + The Source object containing sources and query information. + + Returns + ------- + str + The formatted digest string. + + """ + formatter = DefaultFormatter() + return formatter.format(context, context.query) + + +class Formatter: + """Base formatter class.""" + + def __init__(self, template_subdir: str) -> None: + self.separator = SEPARATOR + template_dir = Path(__file__).parent / "format" / template_subdir + self.env = Environment(loader=FileSystemLoader(template_dir), autoescape=True) + + def _get_template_for_node(self, node: Source) -> Template: + """Get template based on node class name.""" + template_name = f"{node.__class__.__name__}.j2" + return self.env.get_template(template_name) + + +class DefaultFormatter(Formatter): + """Default formatter for rendering filesystem nodes using Jinja2 templates.""" + + def __init__(self) -> None: + super().__init__("DefaultFormatter") + + def format(self, source: Source, query: IngestionQuery) -> str: + """Format a source with the given query.""" + if query is None: + # Handle case where query is None (shouldn't happen in normal usage) + raise ValueError("ContextV1 must have a valid query object") + + # Calculate and set token count for ContextV1 + if hasattr(source, '_token_count'): + token_count = self._calculate_token_count(source) + source._token_count = token_count + # Also set token count in the extra dict + source.extra["token_count"] = token_count + + try: + return self._format_node(source, query) + except Exception as e: + # Log the error for debugging + import logging + logging.error(f"Error in DefaultFormatter: {e}") + raise + + def _calculate_token_count(self, source: Source) -> str: + """Calculate token count for the entire source.""" + # Gather all content from the source + content = self._gather_all_content(source) + return _format_token_count(content) or "Unknown" + + def _gather_all_content(self, node: Source) -> str: + """Recursively gather all content from the source tree.""" + content_parts = [] + + # Add content from the current node + if hasattr(node, 'content'): + content_parts.append(node.content) + + # Add content from all sources if it's a ContextV1 + if hasattr(node, 'sources'): + for source in node.sources: + content_parts.append(self._gather_all_content(source)) + + # Add content from children if it's a directory + if hasattr(node, 'children'): + for child in node.children: + content_parts.append(self._gather_all_content(child)) + + return "\n".join(filter(None, content_parts)) + + @singledispatchmethod + def _format_node(self, node: Source, query: IngestionQuery) -> str: + """Dynamically format any node type based on available templates.""" + try: + template = self._get_template_for_node(node) + # Provide common template variables + context_vars = { + "source": node, + "query": query, + "formatter": self, + "SEPARATOR": SEPARATOR, + } + + return template.render(**context_vars) + except TemplateNotFound: + # Fallback: return content if available, otherwise empty string + return f"{getattr(node, 'content', '')}" + + +class DebugFormatter(Formatter): + """Debug formatter that shows detailed information about filesystem nodes.""" + + def __init__(self) -> None: + super().__init__("DebugFormatter") + + def _get_template_for_node(self, node: Source) -> Template: + """Get template based on node class name.""" + template_name = f"{node.__class__.__name__}.j2" + return self.env.get_template(template_name) + + def format(self, node: Source, query: IngestionQuery) -> str: + """Dynamically format any node type with debug information.""" + try: + # Get the actual class name + class_name = node.__class__.__name__ + + # Get all field names (both from dataclass fields and regular attributes) + field_names = [] + + # Try to get dataclass fields first + def _raise_no_dataclass_fields() -> None: + msg = "No dataclass fields found" + raise AttributeError(msg) + + try: + if hasattr(node, "__dataclass_fields__") and hasattr(node.__dataclass_fields__, "keys"): + field_names.extend(node.__dataclass_fields__.keys()) + else: + _raise_no_dataclass_fields() # Fall through to backup method + except (AttributeError, TypeError): + # Fall back to getting all non-private attributes + field_names = [ + attr for attr in dir(node) if not attr.startswith("_") and not callable(getattr(node, attr, None)) + ] + + # Format the debug output + fields_str = ", ".join(field_names) + + # Try to get specific template, fallback to Source.j2 + try: + template = self._get_template_for_node(node) + except TemplateNotFound: + template = self.env.get_template("Source.j2") + + return template.render( + SEPARATOR=SEPARATOR, + class_name=class_name, + fields_str=fields_str, + node=node, + query=query, + formatter=self, + ) + except TemplateNotFound: + # Ultimate fallback + return f"DEBUG: {node.__class__.__name__}" + + +class SummaryFormatter(Formatter): + """Dedicated formatter for generating summaries of filesystem nodes.""" + + def __init__(self) -> None: + super().__init__("SummaryFormatter") + + def format(self, source: Source, query: IngestionQuery) -> str: + """Generate the summary output.""" + if query is None: + # Handle case where query is None (shouldn't happen in normal usage) + raise ValueError("ContextV1 must have a valid query object") + return self.summary(source, query) + + @singledispatchmethod + def summary(self, node: Source, query: IngestionQuery) -> str: + """Dynamically generate summary for any node type based on available templates.""" + try: + # Provide common template variables + context_vars = { + "source": node, + "query": query, + "formatter": self, + } + + template = self._get_template_for_node(node) + return template.render(**context_vars) + except TemplateNotFound: + # Fallback: return name if available + return f"{getattr(node, 'name', '')}" diff --git a/src/gitingest/query_parser.py b/src/gitingest/query_parser.py index dc4ccdef..c2de93d0 100644 --- a/src/gitingest/query_parser.py +++ b/src/gitingest/query_parser.py @@ -135,7 +135,13 @@ def parse_local_dir_path(path_str: str) -> IngestionQuery: """ path_obj = Path(path_str).resolve() slug = path_obj.name if path_str == "." else path_str.strip("/") - return IngestionQuery(local_path=path_obj, slug=slug, id=uuid.uuid4()) + return IngestionQuery( + local_path=path_obj, + slug=slug, + id=uuid.uuid4(), + user_name="local", # Set a default value for local paths + repo_name=slug, # Use the slug as the repo name for local paths + ) async def _configure_branch_or_tag( diff --git a/src/gitingest/schemas/__init__.py b/src/gitingest/schemas/__init__.py index db5cb12f..8a05ccf3 100644 --- a/src/gitingest/schemas/__init__.py +++ b/src/gitingest/schemas/__init__.py @@ -1,7 +1,27 @@ """Module containing the schemas for the Gitingest package.""" from gitingest.schemas.cloning import CloneConfig -from gitingest.schemas.filesystem import FileSystemNode, FileSystemNodeType, FileSystemStats +from gitingest.schemas.contextv1 import ContextV1 +from gitingest.schemas.filesystem import ( + FileSystemDirectory, + FileSystemFile, + FileSystemNode, + FileSystemStats, + FileSystemSymlink, + GitRepository, + Source, +) from gitingest.schemas.ingestion import IngestionQuery -__all__ = ["CloneConfig", "FileSystemNode", "FileSystemNodeType", "FileSystemStats", "IngestionQuery"] +__all__ = [ + "CloneConfig", + "ContextV1", + "FileSystemDirectory", + "FileSystemFile", + "FileSystemNode", + "FileSystemStats", + "FileSystemSymlink", + "GitRepository", + "IngestionQuery", + "Source", +] diff --git a/src/gitingest/schemas/contextv1.py b/src/gitingest/schemas/contextv1.py new file mode 100644 index 00000000..99e598bb --- /dev/null +++ b/src/gitingest/schemas/contextv1.py @@ -0,0 +1,81 @@ +"""Schema for ContextV1 objects used in formatting.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import TYPE_CHECKING, Iterator + +from gitingest.schemas.filesystem import FileSystemDirectory, FileSystemNode, Source + +if TYPE_CHECKING: + from gitingest.schemas import IngestionQuery + + +@dataclass +class ContextV1(Source): + """The ContextV1 object is an object that contains all information needed to produce a formatted output. + + This object contains all information needed to produce a formatted output + similar to the "legacy" output. + + Attributes + ---------- + sources : list[Source] + List of source objects (files, directories, etc.) + query : IngestionQuery + The query context. + + """ + + sources: list[Source] = field(default_factory=list) + query: IngestionQuery = field(default=None) + + # Source fields + name: str = "context" + path_str: str = "" + path: Path = Path() + _token_count: str = "" + + def render_tree(self, prefix: str = "", *, is_last: bool = True) -> list[str]: + """Render the tree representation of this source.""" + # Return a simple tree representation for ContextV1 + return [f"{prefix}ContextV1: {len(self.sources)} sources"] + + @property + def sources_by_type(self) -> dict[str, list[Source]]: + """Return sources grouped by their class name.""" + result = {} + for source in self.sources: + class_name = source.__class__.__name__ + if class_name not in result: + result[class_name] = [] + result[class_name].append(source) + return result + + def __getitem__(self, key: str) -> list[Source]: + """Allow dict-like access to sources by type name.""" + sources_dict = self.sources_by_type + if key not in sources_dict: + error_msg = f"No sources of type '{key}' found" + raise KeyError(error_msg) + return sources_dict[key] + + def __iter__(self) -> Iterator[Source]: + """Allow iteration over all sources.""" + return iter(self.sources) + + @property + def file_count(self) -> int: + """Calculate total file count based on sources.""" + # No need to iterate on children, directories are already aware of their + # file count + total = 0 + for source in self.sources: + if isinstance(source, FileSystemDirectory): + # For directories, add their file_count + total += source.file_count + elif isinstance(source, FileSystemNode): + # For individual files/nodes, increment by 1 + total += 1 + return total diff --git a/src/gitingest/schemas/filesystem.py b/src/gitingest/schemas/filesystem.py index cc66e7b1..e6ceff64 100644 --- a/src/gitingest/schemas/filesystem.py +++ b/src/gitingest/schemas/filesystem.py @@ -2,18 +2,15 @@ from __future__ import annotations -import os +from abc import ABC, abstractmethod from dataclasses import dataclass, field from enum import Enum, auto from typing import TYPE_CHECKING -from gitingest.utils.compat_func import readlink -from gitingest.utils.file_utils import _decodes, _get_preferred_encodings, _read_chunk -from gitingest.utils.notebook import process_notebook - if TYPE_CHECKING: from pathlib import Path + SEPARATOR = "=" * 48 # Tiktoken, the tokenizer openai uses, counts 2 tokens if we have more than 48 @@ -34,48 +31,68 @@ class FileSystemStats: @dataclass -class FileSystemNode: # pylint: disable=too-many-instance-attributes - """Class representing a node in the file system (either a file or directory). +class Source(ABC): + """Abstract base class for all sources (files, directories, etc).""" - Tracks properties of files/directories for comprehensive analysis. - """ + metadata: dict = field(default_factory=dict) + extra: dict = field(default_factory=dict) - name: str - type: FileSystemNodeType - path_str: str - path: Path - size: int = 0 - file_count: int = 0 - dir_count: int = 0 + @abstractmethod + def render_tree(self, prefix: str = "", *, is_last: bool = True) -> list[str]: + """Render the tree representation of this source.""" + + +@dataclass +class FileSystemNode(Source): + """Base class for filesystem nodes (files, directories, symlinks).""" + + name: str = "" + path_str: str = "" + path: Path = None # type: ignore depth: int = 0 - children: list[FileSystemNode] = field(default_factory=list) + size: int = 0 - def sort_children(self) -> None: - """Sort the children nodes of a directory according to a specific order. + @property + def tree(self) -> str: + """Return the name of this node.""" + return self.name + + +@dataclass +class FileSystemFile(FileSystemNode): + """Represents a file in the filesystem.""" - Order of sorting: - 2. Regular files (not starting with dot) - 3. Hidden files (starting with dot) - 4. Regular directories (not starting with dot) - 5. Hidden directories (starting with dot) + @property + def content(self) -> str: + """Read and return the content of the file.""" + # read the file + try: + return self.path.read_text(encoding="utf-8") + except Exception as e: + return f"Error reading content of {self.name}: {e}" - All groups are sorted alphanumerically within themselves. + def render_tree(self, prefix: str = "", *, is_last: bool = True) -> list[str]: + """Render the tree representation of this file.""" + current_prefix = "└── " if is_last else "ā”œā”€ā”€ " + return [f"{prefix}{current_prefix}{self.name}"] - Raises - ------ - ValueError - If the node is not a directory. - """ - if self.type != FileSystemNodeType.DIRECTORY: - msg = "Cannot sort children of a non-directory node" - raise ValueError(msg) +@dataclass +class FileSystemDirectory(FileSystemNode): + """Represents a directory in the filesystem.""" + + children: list[FileSystemNode] = field(default_factory=list) + file_count: int = 0 + dir_count: int = 0 + file_count_total: int = 0 + type: FileSystemNodeType = FileSystemNodeType.DIRECTORY + + def sort_children(self) -> None: + """Sort the children nodes of a directory according to a specific order.""" def _sort_key(child: FileSystemNode) -> tuple[int, str]: - # returns the priority order for the sort function, 0 is first - # Groups: 0=README, 1=regular file, 2=hidden file, 3=regular dir, 4=hidden dir name = child.name.lower() - if child.type == FileSystemNodeType.FILE: + if hasattr(child, "type") and getattr(child, "type", None) == FileSystemNodeType.FILE: if name == "readme" or name.startswith("readme."): return (0, name) return (1 if not name.startswith(".") else 2, name) @@ -83,79 +100,55 @@ def _sort_key(child: FileSystemNode) -> tuple[int, str]: self.children.sort(key=_sort_key) - @property - def content_string(self) -> str: - """Return the content of the node as a string, including path and content. - - Returns - ------- - str - A string representation of the node's content. - - """ - parts = [ - SEPARATOR, - f"{self.type.name}: {str(self.path_str).replace(os.sep, '/')}" - + (f" -> {readlink(self.path).name}" if self.type == FileSystemNodeType.SYMLINK else ""), - SEPARATOR, - f"{self.content}", - ] - - return "\n".join(parts) + "\n\n" + def render_tree(self, prefix: str = "", *, is_last: bool = True) -> list[str]: + """Render the tree representation of this directory.""" + lines = [] + current_prefix = "└── " if is_last else "ā”œā”€ā”€ " + display_name = self.name + "/" + lines.append(f"{prefix}{current_prefix}{display_name}") + if hasattr(self, "children") and self.children: + new_prefix = prefix + (" " if is_last else "│ ") + for i, child in enumerate(self.children): + is_last_child = i == len(self.children) - 1 + lines.extend(child.render_tree(prefix=new_prefix, is_last=is_last_child)) + return lines @property - def content(self) -> str: # pylint: disable=too-many-return-statements - """Return file content (if text / notebook) or an explanatory placeholder. - - Heuristically decides whether the file is text or binary by decoding a small chunk of the file - with multiple encodings and checking for common binary markers. + def tree(self) -> str: + """Return the tree representation of this directory.""" + return "\n".join(self.render_tree()) - Returns - ------- - str - The content of the file, or an error message if the file could not be read. - Raises - ------ - ValueError - If the node is a directory. - - """ - if self.type == FileSystemNodeType.DIRECTORY: - msg = "Cannot read content of a directory node" - raise ValueError(msg) - - if self.type == FileSystemNodeType.SYMLINK: - return "" # TODO: are we including the empty content of symlinks? - - if self.path.suffix == ".ipynb": # Notebook - try: - return process_notebook(self.path) - except Exception as exc: - return f"Error processing notebook: {exc}" - - chunk = _read_chunk(self.path) - - if chunk is None: - return "Error reading file" - - if chunk == b"": - return "[Empty file]" +@dataclass +class GitRepository(FileSystemDirectory): + """A directory that contains a .git folder, representing a Git repository.""" + + git_info: dict = field(default_factory=dict) # Store git metadata like branch, commit, etc. + + def render_tree(self, prefix: str = "", *, is_last: bool = True) -> list[str]: + """Render the tree representation of this git repository.""" + lines = [] + current_prefix = "└── " if is_last else "ā”œā”€ā”€ " + # Mark as git repo in the tree + display_name = f"{self.name}/ (git repository)" + lines.append(f"{prefix}{current_prefix}{display_name}") + if hasattr(self, "children") and self.children: + new_prefix = prefix + (" " if is_last else "│ ") + for i, child in enumerate(self.children): + is_last_child = i == len(self.children) - 1 + lines.extend(child.render_tree(prefix=new_prefix, is_last=is_last_child)) + return lines - if not _decodes(chunk, "utf-8"): - return "[Binary file]" - # Find the first encoding that decodes the sample - good_enc: str | None = next( - (enc for enc in _get_preferred_encodings() if _decodes(chunk, encoding=enc)), - None, - ) +@dataclass +class FileSystemSymlink(FileSystemNode): + """Represents a symbolic link in the filesystem.""" - if good_enc is None: - return "Error: Unable to decode file with available encodings" + target: str = "" + # Add symlink-specific fields if needed - try: - with self.path.open(encoding=good_enc) as fp: - return fp.read() - except (OSError, UnicodeDecodeError) as exc: - return f"Error reading file with {good_enc!r}: {exc}" + def render_tree(self, prefix: str = "", *, is_last: bool = True) -> list[str]: + """Render the tree representation of this symlink.""" + current_prefix = "└── " if is_last else "ā”œā”€ā”€ " + display_name = f"{self.name} -> {self.target}" if self.target else self.name + return [f"{prefix}{current_prefix}{display_name}"] diff --git a/src/server/query_processor.py b/src/server/query_processor.py index 03f52f16..5b5dc226 100644 --- a/src/server/query_processor.py +++ b/src/server/query_processor.py @@ -8,6 +8,7 @@ from gitingest.clone import clone_repo from gitingest.ingestion import ingest_query +from gitingest.output_formatter import DefaultFormatter, SummaryFormatter from gitingest.query_parser import parse_remote_repo from gitingest.utils.git_utils import resolve_commit, validate_github_token from gitingest.utils.logging_config import get_logger @@ -301,29 +302,48 @@ async def process_query( raise RuntimeError(msg) try: - summary, tree, content = ingest_query(query) - digest_content = tree + "\n" + content - _store_digest_content(query, clone_config, digest_content, summary, tree, content) + context = ingest_query(query) + formatter = DefaultFormatter() + digest = formatter.format(context, context.query) + summary_formatter = SummaryFormatter() + summary = summary_formatter.format(context, context.query) + + # Store digest based on S3 configuration + if is_s3_enabled(): + # Upload to S3 instead of storing locally + s3_file_path = generate_s3_file_path( + source=query.url, + user_name=cast("str", query.user_name), + repo_name=cast("str", query.repo_name), + commit=query.commit, + subpath=query.subpath, + include_patterns=query.include_patterns, + ignore_patterns=query.ignore_patterns, + ) + s3_url = upload_to_s3( + content=formatter.format(context, context.query), s3_file_path=s3_file_path, ingest_id=query.id + ) + # Store S3 URL in query for later use + query.s3_url = s3_url + else: + # Store locally + local_txt_file = Path(clone_config.local_path).with_suffix(".txt") + logger.info("Writing digest to local file", extra={"file_path": str(local_txt_file)}) + with local_txt_file.open("w", encoding="utf-8") as f: + f.write(digest) + except Exception as exc: _print_error(query.url, exc, max_file_size, pattern_type, pattern) # Clean up repository even if processing failed _cleanup_repository(clone_config) return IngestErrorResponse(error=str(exc)) - if len(content) > MAX_DISPLAY_SIZE: - content = ( - f"(Files content cropped to {int(MAX_DISPLAY_SIZE / 1_000)}k characters, " - "download full ingest to see more)\n" + content[:MAX_DISPLAY_SIZE] + if len(digest) > MAX_DISPLAY_SIZE: + digest = ( + f"(Digest cropped to {int(MAX_DISPLAY_SIZE / 1_000)}k characters, " + "download full ingest to see more)\n" + digest[:MAX_DISPLAY_SIZE] ) - _print_success( - url=query.url, - max_file_size=max_file_size, - pattern_type=pattern_type, - pattern=pattern, - summary=summary, - ) - digest_url = _generate_digest_url(query) # Clean up the repository after successful processing @@ -334,8 +354,8 @@ async def process_query( short_repo_url=short_repo_url, summary=summary, digest_url=digest_url, - tree=tree, - content=content, + tree=context.sources[0].tree, # TODO: this is a hack to get the tree of the first source + content=digest, default_max_file_size=max_file_size, pattern_type=pattern_type, pattern=pattern, diff --git a/src/server/routers_utils.py b/src/server/routers_utils.py index 3eaf0e59..f0471375 100644 --- a/src/server/routers_utils.py +++ b/src/server/routers_utils.py @@ -2,14 +2,19 @@ from __future__ import annotations +import traceback from typing import Any from fastapi import status from fastapi.responses import JSONResponse +from gitingest.utils.logging_config import get_logger from server.models import IngestErrorResponse, IngestSuccessResponse, PatternType from server.query_processor import process_query +# Initialize logger for this module +logger = get_logger(__name__) + COMMON_INGEST_RESPONSES: dict[int | str, dict[str, Any]] = { status.HTTP_200_OK: {"model": IngestSuccessResponse, "description": "Successful ingestion"}, status.HTTP_400_BAD_REQUEST: {"model": IngestErrorResponse, "description": "Bad request or processing error"}, @@ -40,6 +45,8 @@ async def _perform_ingestion( ) if isinstance(result, IngestErrorResponse): + # Log stack trace for debugging + logger.error("Ingest processing failed", extra={"traceback": traceback.format_exc()}) # Return structured error response with 400 status code return JSONResponse(status_code=status.HTTP_400_BAD_REQUEST, content=result.model_dump()) @@ -49,9 +56,13 @@ async def _perform_ingestion( except ValueError as ve: # Handle validation errors with 400 status code error_response = IngestErrorResponse(error=f"Validation error: {ve!s}") + # Log stack trace for debugging + logger.exception("Validation error during ingest", extra={"error": str(ve)}) return JSONResponse(status_code=status.HTTP_400_BAD_REQUEST, content=error_response.model_dump()) except Exception as exc: # Handle unexpected errors with 500 status code error_response = IngestErrorResponse(error=f"Internal server error: {exc!s}") + # Log stack trace for debugging + logger.exception("Unexpected error during ingest", extra={"error": str(exc)}) return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=error_response.model_dump()) diff --git a/tests/test_output_formatter.py b/tests/test_output_formatter.py new file mode 100644 index 00000000..6668bc7b --- /dev/null +++ b/tests/test_output_formatter.py @@ -0,0 +1,322 @@ +"""Tests for the output_formatter module. + +These tests validate the formatting behavior of DefaultFormatter and StupidFormatter +for different FileSystemNode types (File, Directory, Symlink). +""" +# pylint: disable=redefined-outer-name # pytest fixtures are expected to redefine names + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import Mock, patch + +import pytest + +from gitingest.output_formatter import DebugFormatter, DefaultFormatter, SummaryFormatter +from gitingest.schemas import FileSystemDirectory, FileSystemFile, FileSystemSymlink, IngestionQuery +from gitingest.schemas.filesystem import FileSystemNodeType + + +@pytest.fixture +def mock_query() -> IngestionQuery: + """Create a mock IngestionQuery for testing.""" + query = Mock(spec=IngestionQuery) + query.user_name = "test_user" + query.repo_name = "test_repo" + query.slug = "test_slug" + query.branch = "main" + query.commit = "abc123" + query.subpath = "/" + query.tag = None + return query + + +@pytest.fixture +def mock_file_node() -> FileSystemFile: + """Create a mock FileSystemFile for testing.""" + file_node = Mock(spec=FileSystemFile) + file_node.name = "test_file.py" + file_node.path = Path("/fake/path/test_file.py") + file_node.path_str = "/fake/path/test_file.py" + file_node.content = "print('hello world')\nprint('test content')" + file_node.size = 100 + file_node.depth = 1 + file_node.type = FileSystemNodeType.FILE + return file_node + + +@pytest.fixture +def mock_directory_node() -> FileSystemDirectory: + """Create a mock FileSystemDirectory for testing.""" + dir_node = Mock(spec=FileSystemDirectory) + dir_node.name = "src" + dir_node.path = Path("/fake/path/src") + dir_node.path_str = "/fake/path/src" + dir_node.children = [] + dir_node.file_count = 2 + dir_node.dir_count = 1 + dir_node.size = 500 + dir_node.depth = 0 + dir_node.type = FileSystemNodeType.DIRECTORY + dir_node.tree = "src/\nā”œā”€ā”€ file1.py\n└── file2.py" + return dir_node + + +@pytest.fixture +def mock_symlink_node() -> FileSystemSymlink: + """Create a mock FileSystemSymlink for testing.""" + symlink_node = Mock(spec=FileSystemSymlink) + symlink_node.name = "link_to_file" + symlink_node.path = Path("/fake/path/link_to_file") + symlink_node.path_str = "/fake/path/link_to_file" + symlink_node.target = "target_file.py" + symlink_node.size = 0 + symlink_node.depth = 1 + symlink_node.type = FileSystemNodeType.SYMLINK + return symlink_node + + +class TestDefaultFormatter: + """Test cases for DefaultFormatter class.""" + + def test_init(self) -> None: + """Test DefaultFormatter initialization.""" + formatter = DefaultFormatter() + assert formatter.env is not None + assert formatter.format is not None + + def test_format_file_node(self, mock_file_node: FileSystemFile, mock_query: IngestionQuery) -> None: + """Test formatting a FileSystemFile node.""" + formatter = DefaultFormatter() + result = formatter.format(mock_file_node, mock_query) + + # Should contain separator, filename, and content + assert "================================================" in result + assert "test_file.py" in result + assert "print('hello world')" in result + assert "print('test content')" in result + + def test_format_directory_node(self, mock_directory_node: FileSystemDirectory, mock_query: IngestionQuery) -> None: + """Test formatting a FileSystemDirectory node.""" + # Create mock child nodes + child1 = Mock() + child2 = Mock() + mock_directory_node.children = [child1, child2] + + formatter = DefaultFormatter() + + # Mock the format method calls for children + with patch.object( + formatter, + "format", + side_effect=lambda node, _: f"formatted_{node.name}" if hasattr(node, "name") else "formatted_child", + ) as mock_format: + # Need to call the actual method for the directory node itself + mock_format.side_effect = None + formatter.format(mock_directory_node, mock_query) + + # Reset side effect and call again to test child formatting + mock_format.side_effect = lambda node, _: f"formatted_{getattr(node, 'name', 'child')}" + formatter.format(mock_directory_node, mock_query) + + def test_format_symlink_node( + self, + mock_symlink_node: FileSystemSymlink, + mock_query: IngestionQuery, + ) -> None: + """Test formatting a FileSystemSymlink node.""" + formatter = DefaultFormatter() + result = formatter.format(mock_symlink_node, mock_query) + + # Should contain separator, filename, and target + assert "================================================" in result + assert "link_to_file" in result + assert "target_file.py" in result + + def test_format_symlink_node_no_target( + self, + mock_symlink_node: FileSystemSymlink, + mock_query: IngestionQuery, + ) -> None: + """Test formatting a FileSystemSymlink node without target.""" + mock_symlink_node.target = "" + formatter = DefaultFormatter() + result = formatter.format(mock_symlink_node, mock_query) + + # Should contain separator and filename but no arrow + assert "================================================" in result + assert "link_to_file" in result + assert " -> " not in result + + +class TestSummaryFormatter: + """Test cases for SummaryFormatter class.""" + + def test_init(self) -> None: + """Test SummaryFormatter initialization.""" + formatter = SummaryFormatter() + assert formatter.env is not None + assert formatter.summary is not None + + def test_summary_directory_node( + self, + mock_directory_node: FileSystemDirectory, + mock_query: IngestionQuery, + ) -> None: + """Test summary generation for a FileSystemDirectory node.""" + formatter = SummaryFormatter() + result = formatter.summary(mock_directory_node, mock_query) + + assert "Directory structure:" in result + assert "src/" in result + assert "file1.py" in result + assert "file2.py" in result + + def test_summary_file_node_default( + self, + mock_file_node: FileSystemFile, + mock_query: IngestionQuery, + ) -> None: + """Test default summary for FileSystemFile node.""" + formatter = SummaryFormatter() + result = formatter.summary(mock_file_node, mock_query) + + # Should use default handler and return the name + assert "test_file.py" in result + + +class TestDebugFormatter: + """Test cases for DebugFormatter class.""" + + def test_init(self) -> None: + """Test DebugFormatter initialization.""" + formatter = DebugFormatter() + assert formatter.env is not None + assert formatter.format is not None + + def test_format_file_node_debug_info( + self, + mock_file_node: FileSystemFile, + mock_query: IngestionQuery, + ) -> None: + """Test that DebugFormatter shows debug info for FileSystemFile.""" + formatter = DebugFormatter() + result = formatter.format(mock_file_node, mock_query) + + # Should contain debug information + assert "================================================" in result + assert "DEBUG: FileSystemFile" in result + assert "Fields:" in result + # Should contain field names + assert "name" in result + assert "path" in result + assert "size" in result + + def test_format_directory_node_debug_info( + self, + mock_directory_node: FileSystemDirectory, + mock_query: IngestionQuery, + ) -> None: + """Test that DebugFormatter shows debug info for FileSystemDirectory.""" + formatter = DebugFormatter() + result = formatter.format(mock_directory_node, mock_query) + + # Should contain debug information + assert "DEBUG: FileSystemDirectory" in result + assert "Fields:" in result + assert "name" in result + assert "children" in result + + def test_format_symlink_node_debug_info( + self, + mock_symlink_node: FileSystemSymlink, + mock_query: IngestionQuery, + ) -> None: + """Test that DebugFormatter shows debug info for FileSystemSymlink.""" + formatter = DebugFormatter() + result = formatter.format(mock_symlink_node, mock_query) + + # Should contain debug information + assert "DEBUG: FileSystemSymlink" in result + assert "Fields:" in result + assert "name" in result + assert "target" in result + + def test_format_all_node_types_show_debug( + self, + mock_file_node: FileSystemFile, + mock_directory_node: FileSystemDirectory, + mock_symlink_node: FileSystemSymlink, + mock_query: IngestionQuery, + ) -> None: + """Test that DebugFormatter shows debug info for all node types.""" + formatter = DebugFormatter() + + file_result = formatter.format(mock_file_node, mock_query) + dir_result = formatter.format(mock_directory_node, mock_query) + symlink_result = formatter.format(mock_symlink_node, mock_query) + + # All should contain debug headers + assert "DEBUG: FileSystemFile" in file_result + assert "DEBUG: FileSystemDirectory" in dir_result + assert "DEBUG: FileSystemSymlink" in symlink_result + + # All should contain field information + assert "Fields:" in file_result + assert "Fields:" in dir_result + assert "Fields:" in symlink_result + + def test_debug_formatter_vs_default_formatter( + self, + mock_file_node: FileSystemFile, + mock_query: IngestionQuery, + ) -> None: + """Test that DebugFormatter produces different output than DefaultFormatter.""" + default_formatter = DefaultFormatter() + debug_formatter = DebugFormatter() + + default_result = default_formatter.format(mock_file_node, mock_query) + debug_result = debug_formatter.format(mock_file_node, mock_query) + + # Results should be different + assert default_result != debug_result + + # Debug should contain debug info, default should not + assert "DEBUG:" in debug_result + assert "DEBUG:" not in default_result + + # Debug should show fields, default shows content + assert "Fields:" in debug_result + assert "Fields:" not in default_result + + +class TestFormatterEdgeCases: + """Test edge cases and error conditions.""" + + def test_format_unknown_node_type(self, mock_query: IngestionQuery) -> None: + """Test formatting with an unknown node type.""" + unknown_node = Mock() + unknown_node.name = "unknown" + + formatter = DefaultFormatter() + # Should fall back to default behavior + result = formatter.format(unknown_node, mock_query) + assert result is not None + + def test_format_node_without_name(self, mock_query: IngestionQuery) -> None: + """Test formatting a node without a name attribute.""" + nameless_node = Mock(spec=FileSystemFile) + # Remove name attribute + del nameless_node.name + + formatter = DebugFormatter() + # Should handle gracefully (jinja template will show empty) + result = formatter.format(nameless_node, mock_query) + assert result is not None + + def test_format_with_none_query(self, mock_file_node: FileSystemFile) -> None: + """Test formatting with None query.""" + formatter = DefaultFormatter() + # Should handle None query gracefully + result = formatter.format(mock_file_node, None) + assert result is not None