Conversation
WalkthroughAdds MCP integration: new Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant CLI
participant MCP_Server
participant ZnTrack_Pkg
User->>CLI: zntrack-mcp (start)
CLI->>MCP_Server: initialize mcp tools
User->>MCP_Server: call tool (e.g., get_node_info, get_node_list)
MCP_Server->>ZnTrack_Pkg: import nodes / load entrypoints / inspect classes
ZnTrack_Pkg-->>MCP_Server: introspection data / docs
MCP_Server-->>User: return results (JSON/markdown)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Actionable comments posted: 3
🔭 Outside diff range comments (1)
zntrack/mcp/server.py (1)
139-152: Fix incorrect success message when running entire pipeline.When
nodeis None, the success message incorrectly states "Node 'None' executed successfully."@mcp.tool def run_node(node: str | None = None) -> dict: """Run a specific node.""" import subprocess if node is None: result = subprocess.run(["dvc", "repro"], capture_output=True, text=True) else: result = subprocess.run(["dvc", "repro", node], capture_output=True, text=True) if result.returncode != 0: - return {"error": f"Failed to run node '{node}': {result.stderr}"} + error_msg = f"Failed to run node '{node}': {result.stderr}" if node else f"Failed to run pipeline: {result.stderr}" + return {"error": error_msg} - return {"message": f"Node '{node}' executed successfully.", "output": result.stdout} + success_msg = f"Node '{node}' executed successfully." if node else "Pipeline executed successfully." + return {"message": success_msg, "output": result.stdout}
🧹 Nitpick comments (5)
zntrack/mcp/__init__.py (1)
8-8: Line exceeds configured maximum length.This line is 94 characters long, exceeding the project's 90-character limit set in pyproject.toml.
-bunx @anthropic-ai/claude-code mcp add zntrack-server -- uv run --project "$(pwd)" zntrack-mcp +bunx @anthropic-ai/claude-code mcp add zntrack-server \ + -- uv run --project "$(pwd)" zntrack-mcpzntrack/mcp/server.py (4)
1-1: Remove redundant file comment.The
# server.pycomment is unnecessary as the filename is already clear.-# server.py
10-27: Consider adding field descriptions to Pydantic models.The models would benefit from field descriptions to clarify their purpose, especially for API documentation.
class NodeDict(BaseModel): - id: dict[str, str] + id: dict[str, str] = Field(description="Node identifier attributes") class LinkDict(BaseModel): - source: dict[str, str] - target: dict[str, str] + source: dict[str, str] = Field(description="Source node identifier") + target: dict[str, str] = Field(description="Target node identifier") # Other optional edge attributes, e.g., weight # Same here: total=False for flexibilityAdd the import:
-from pydantic import BaseModel +from pydantic import BaseModel, Field
76-76: Line exceeds configured maximum length.This line is 111 characters long, exceeding the project's 90-character limit.
-def node_info(node: str) -> dict: - """Get information about a specific node in the workflow, including class source if it's a zntrack node.""" +def node_info(node: str) -> dict: + """Get information about a specific node in the workflow. + + Includes class source if it's a zntrack node. + """
1-152: Consider adding input validation for subprocess commands.While the current implementation avoids shell injection by using list arguments instead of
shell=True, consider adding explicit validation for node names to ensure they don't contain unexpected characters that could cause issues.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
pyproject.toml(3 hunks)zntrack/mcp/__init__.py(1 hunks)zntrack/mcp/server.py(1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
zntrack/mcp/__init__.py
8-8: Line too long (94 > 90)
(E501)
zntrack/mcp/server.py
76-76: Line too long (111 > 90)
(E501)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: pytest (3.10, ubuntu-latest)
- GitHub Check: pytest (3.11, ubuntu-latest)
- GitHub Check: pytest (3.12, ubuntu-latest)
- GitHub Check: pytest (3.13, ubuntu-latest)
- GitHub Check: benchmark
🔇 Additional comments (3)
zntrack/mcp/__init__.py (1)
1-16: Well-structured module initialization.The module provides clear documentation with a practical example and correctly exposes the MCP server through
__all__.pyproject.toml (1)
3-3: Appropriate configuration for MCP integration.The version bump, CLI entry point, and optional dependencies are correctly configured for the new MCP feature.
Also applies to: 25-25, 60-65
zntrack/mcp/server.py (1)
74-124: Well-implemented with good error handling.The function properly uses
shlexfor safe command parsing and handles various error cases. Be aware that dynamic imports based on node commands could pose security risks if node data comes from untrusted sources.
| @mcp.tool | ||
| def status(name: str | None = None) -> dict[str, list[dict]]: | ||
| """Check if any Node in the workflow is not up-to-date.""" | ||
| if name: | ||
| result = subprocess.run( | ||
| ["dvc", "status", "--json", name], capture_output=True, text=True | ||
| ) | ||
| else: | ||
| result = subprocess.run( | ||
| ["dvc", "status", "--json"], capture_output=True, text=True | ||
| ) | ||
| return json.loads(result.stdout) | ||
|
|
There was a problem hiding this comment.
Add error handling for subprocess and JSON parsing.
The function should handle potential failures when running the DVC command or parsing JSON.
@mcp.tool
def status(name: str | None = None) -> dict[str, list[dict]]:
"""Check if any Node in the workflow is not up-to-date."""
if name:
result = subprocess.run(
["dvc", "status", "--json", name], capture_output=True, text=True
)
else:
result = subprocess.run(
["dvc", "status", "--json"], capture_output=True, text=True
)
- return json.loads(result.stdout)
+
+ if result.returncode != 0:
+ return {"error": f"DVC status failed: {result.stderr}"}
+
+ try:
+ return json.loads(result.stdout)
+ except json.JSONDecodeError as e:
+ return {"error": f"Failed to parse DVC output: {e}"}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @mcp.tool | |
| def status(name: str | None = None) -> dict[str, list[dict]]: | |
| """Check if any Node in the workflow is not up-to-date.""" | |
| if name: | |
| result = subprocess.run( | |
| ["dvc", "status", "--json", name], capture_output=True, text=True | |
| ) | |
| else: | |
| result = subprocess.run( | |
| ["dvc", "status", "--json"], capture_output=True, text=True | |
| ) | |
| return json.loads(result.stdout) | |
| @mcp.tool | |
| def status(name: str | None = None) -> dict[str, list[dict]]: | |
| """Check if any Node in the workflow is not up-to-date.""" | |
| if name: | |
| result = subprocess.run( | |
| ["dvc", "status", "--json", name], capture_output=True, text=True | |
| ) | |
| else: | |
| result = subprocess.run( | |
| ["dvc", "status", "--json"], capture_output=True, text=True | |
| ) | |
| if result.returncode != 0: | |
| return {"error": f"DVC status failed: {result.stderr}"} | |
| try: | |
| return json.loads(result.stdout) | |
| except json.JSONDecodeError as e: | |
| return {"error": f"Failed to parse DVC output: {e}"} |
🤖 Prompt for AI Agents
In zntrack/mcp/server.py around lines 32 to 44, the status function lacks error
handling for subprocess execution and JSON parsing. Add try-except blocks to
catch exceptions from subprocess.run (e.g., CalledProcessError) and json.loads
(e.g., JSONDecodeError). Handle errors gracefully by logging or raising
informative exceptions to ensure the function does not fail silently or crash
unexpectedly.
| @mcp.tool | ||
| def graph() -> NodeLinkData: | ||
| """Get the workflow graph in node-link format.""" | ||
| import dvc.api | ||
| import znjson | ||
| from dvc.stage import PipelineStage | ||
| from networkx.readwrite import json_graph | ||
|
|
||
| class PipelineStageConverter(znjson.ConverterBase): | ||
| instance: type = PipelineStage | ||
| representation: str = "dvc.stage.PipelineStage" | ||
|
|
||
| def encode(self, obj: PipelineStage) -> str: | ||
| return obj.addressing | ||
|
|
||
| def decode(self, value: str) -> PipelineStage: | ||
| raise NotImplementedError("Decoding PipelineStage is not implemented") | ||
|
|
||
| fs = dvc.api.DVCFileSystem() | ||
| graph = fs.repo.index.graph | ||
| return json.loads( | ||
| json.dumps( | ||
| json_graph.node_link_data(graph, edges="edges"), | ||
| cls=znjson.ZnEncoder.from_converters([PipelineStageConverter]), | ||
| ) | ||
| ) | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
Add error handling for DVC repository access.
The function should handle cases where the DVC repository is not initialized or accessible.
@mcp.tool
def graph() -> NodeLinkData:
"""Get the workflow graph in node-link format."""
import dvc.api
import znjson
from dvc.stage import PipelineStage
from networkx.readwrite import json_graph
class PipelineStageConverter(znjson.ConverterBase):
instance: type = PipelineStage
representation: str = "dvc.stage.PipelineStage"
def encode(self, obj: PipelineStage) -> str:
return obj.addressing
def decode(self, value: str) -> PipelineStage:
raise NotImplementedError("Decoding PipelineStage is not implemented")
- fs = dvc.api.DVCFileSystem()
- graph = fs.repo.index.graph
- return json.loads(
- json.dumps(
- json_graph.node_link_data(graph, edges="edges"),
- cls=znjson.ZnEncoder.from_converters([PipelineStageConverter]),
+ try:
+ fs = dvc.api.DVCFileSystem()
+ graph = fs.repo.index.graph
+ return json.loads(
+ json.dumps(
+ json_graph.node_link_data(graph, edges="edges"),
+ cls=znjson.ZnEncoder.from_converters([PipelineStageConverter]),
+ )
)
- )
+ except Exception as e:
+ return NodeLinkData(
+ directed=True,
+ multigraph=False,
+ graph={"error": str(e)},
+ nodes=[],
+ edges=[]
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @mcp.tool | |
| def graph() -> NodeLinkData: | |
| """Get the workflow graph in node-link format.""" | |
| import dvc.api | |
| import znjson | |
| from dvc.stage import PipelineStage | |
| from networkx.readwrite import json_graph | |
| class PipelineStageConverter(znjson.ConverterBase): | |
| instance: type = PipelineStage | |
| representation: str = "dvc.stage.PipelineStage" | |
| def encode(self, obj: PipelineStage) -> str: | |
| return obj.addressing | |
| def decode(self, value: str) -> PipelineStage: | |
| raise NotImplementedError("Decoding PipelineStage is not implemented") | |
| fs = dvc.api.DVCFileSystem() | |
| graph = fs.repo.index.graph | |
| return json.loads( | |
| json.dumps( | |
| json_graph.node_link_data(graph, edges="edges"), | |
| cls=znjson.ZnEncoder.from_converters([PipelineStageConverter]), | |
| ) | |
| ) | |
| @mcp.tool | |
| def graph() -> NodeLinkData: | |
| """Get the workflow graph in node-link format.""" | |
| import dvc.api | |
| import znjson | |
| from dvc.stage import PipelineStage | |
| from networkx.readwrite import json_graph | |
| class PipelineStageConverter(znjson.ConverterBase): | |
| instance: type = PipelineStage | |
| representation: str = "dvc.stage.PipelineStage" | |
| def encode(self, obj: PipelineStage) -> str: | |
| return obj.addressing | |
| def decode(self, value: str) -> PipelineStage: | |
| raise NotImplementedError("Decoding PipelineStage is not implemented") | |
| try: | |
| fs = dvc.api.DVCFileSystem() | |
| graph = fs.repo.index.graph | |
| return json.loads( | |
| json.dumps( | |
| json_graph.node_link_data(graph, edges="edges"), | |
| cls=znjson.ZnEncoder.from_converters([PipelineStageConverter]), | |
| ) | |
| ) | |
| except Exception as e: | |
| return NodeLinkData( | |
| directed=True, | |
| multigraph=False, | |
| graph={"error": str(e)}, | |
| nodes=[], | |
| edges=[] | |
| ) |
🤖 Prompt for AI Agents
In zntrack/mcp/server.py around lines 46 to 72, the graph function lacks error
handling for cases when the DVC repository is not initialized or inaccessible.
Wrap the code that accesses the DVC repository and its graph in a try-except
block to catch exceptions related to repository access (such as
FileNotFoundError or dvc.exceptions.NoDvcRepoError). In the except block, handle
the error gracefully by logging an appropriate message or returning a meaningful
error response instead of letting the exception propagate.
| @mcp.tool | ||
| def node_results(node: str, attr: str) -> dict: | ||
| """Get the value of a specific attribute from a zntrack node.""" | ||
| import zntrack | ||
|
|
||
| instance = zntrack.from_rev(node) | ||
| if not hasattr(instance, attr): | ||
| return {"error": f"Node '{node}' does not have attribute '{attr}'."} | ||
|
|
||
| value = getattr(instance, attr) | ||
| return {"repr": repr(value), "type": str(type(value))} | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
Add error handling for node loading.
The function should handle cases where zntrack.from_rev fails.
@mcp.tool
def node_results(node: str, attr: str) -> dict:
"""Get the value of a specific attribute from a zntrack node."""
import zntrack
- instance = zntrack.from_rev(node)
- if not hasattr(instance, attr):
- return {"error": f"Node '{node}' does not have attribute '{attr}'."}
-
- value = getattr(instance, attr)
- return {"repr": repr(value), "type": str(type(value))}
+ try:
+ instance = zntrack.from_rev(node)
+ except Exception as e:
+ return {"error": f"Failed to load node '{node}': {e}"}
+
+ if not hasattr(instance, attr):
+ return {"error": f"Node '{node}' does not have attribute '{attr}'."}
+
+ value = getattr(instance, attr)
+ return {"repr": repr(value), "type": str(type(value))}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @mcp.tool | |
| def node_results(node: str, attr: str) -> dict: | |
| """Get the value of a specific attribute from a zntrack node.""" | |
| import zntrack | |
| instance = zntrack.from_rev(node) | |
| if not hasattr(instance, attr): | |
| return {"error": f"Node '{node}' does not have attribute '{attr}'."} | |
| value = getattr(instance, attr) | |
| return {"repr": repr(value), "type": str(type(value))} | |
| @mcp.tool | |
| def node_results(node: str, attr: str) -> dict: | |
| """Get the value of a specific attribute from a zntrack node.""" | |
| import zntrack | |
| try: | |
| instance = zntrack.from_rev(node) | |
| except Exception as e: | |
| return {"error": f"Failed to load node '{node}': {e}"} | |
| if not hasattr(instance, attr): | |
| return {"error": f"Node '{node}' does not have attribute '{attr}'."} | |
| value = getattr(instance, attr) | |
| return {"repr": repr(value), "type": str(type(value))} |
🤖 Prompt for AI Agents
In zntrack/mcp/server.py around lines 126 to 137, the function node_results
calls zntrack.from_rev without handling potential exceptions. Add a try-except
block around the call to zntrack.from_rev to catch any errors during node
loading, and return a dictionary with an error message describing the failure.
This will ensure the function gracefully handles cases where node loading fails.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
zntrack/mcp/server.py (3)
32-44: Add error handling for subprocess and JSON parsing.The function should handle potential failures when running the DVC command or parsing JSON.
@mcp.tool def status(name: str | None = None) -> dict[str, list[dict]]: """Check if any Node in the workflow is not up-to-date.""" if name: result = subprocess.run( ["dvc", "status", "--json", name], capture_output=True, text=True ) else: result = subprocess.run( ["dvc", "status", "--json"], capture_output=True, text=True ) - return json.loads(result.stdout) + + if result.returncode != 0: + return {"error": f"DVC status failed: {result.stderr}"} + + try: + return json.loads(result.stdout) + except json.JSONDecodeError as e: + return {"error": f"Failed to parse DVC output: {e}"}
46-87: Add error handling for DVC repository access.The function should handle cases where the DVC repository is not initialized or accessible.
@mcp.tool def graph() -> NodeLinkData: """Get the workflow graph in node-link format.""" import dvc.api import znjson from dvc.stage import PipelineStage, Stage from networkx.readwrite import json_graph class PipelineStageConverter(znjson.ConverterBase): instance: type = PipelineStage representation: str = "dvc.stage.PipelineStage" def encode(self, obj: PipelineStage) -> str: return obj.addressing def decode(self, value: str) -> PipelineStage: raise NotImplementedError("Decoding PipelineStage is not implemented") class StageConverter(znjson.ConverterBase): instance: type = Stage representation: str = "dvc.stage.Stage" def encode(self, obj: Stage) -> str: return obj.addressing def decode(self, value: str) -> Stage: raise NotImplementedError("Decoding Stage is not implemented") - fs = dvc.api.DVCFileSystem() - graph = fs.repo.index.graph - return json.loads( - json.dumps( - json_graph.node_link_data(graph, edges="edges"), - cls=znjson.ZnEncoder.from_converters( - [PipelineStageConverter, StageConverter] - ), + try: + fs = dvc.api.DVCFileSystem() + graph = fs.repo.index.graph + return json.loads( + json.dumps( + json_graph.node_link_data(graph, edges="edges"), + cls=znjson.ZnEncoder.from_converters( + [PipelineStageConverter, StageConverter] + ), + ) ) - ) + except Exception as e: + return NodeLinkData( + directed=True, + multigraph=False, + graph={"error": str(e)}, + nodes=[], + edges=[] + )
143-154: Add error handling for node loading.The function should handle cases where
zntrack.from_revfails.@mcp.tool def node_results(node: str, attr: str) -> dict: """Get the value of a specific attribute from a zntrack node.""" import zntrack - instance = zntrack.from_rev(node) - if not hasattr(instance, attr): - return {"error": f"Node '{node}' does not have attribute '{attr}'."} - - value = getattr(instance, attr) - return {"repr": repr(value), "type": str(type(value))} + try: + instance = zntrack.from_rev(node) + except Exception as e: + return {"error": f"Failed to load node '{node}': {e}"} + + if not hasattr(instance, attr): + return {"error": f"Node '{node}' does not have attribute '{attr}'."} + + value = getattr(instance, attr) + return {"repr": repr(value), "type": str(type(value))}
🧹 Nitpick comments (2)
zntrack/mcp/server.py (2)
74-75: TODO: Graph builder for path optimization.The TODO comment suggests implementing a graph builder to filter connections. Would you like me to create an issue to track this enhancement?
159-159: Remove redundant import statement.The
subprocessmodule is already imported at the top of the file (line 3).- import subprocess -
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
zntrack/mcp/__init__.py(1 hunks)zntrack/mcp/server.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- zntrack/mcp/init.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: pytest (3.11, ubuntu-latest)
- GitHub Check: pytest (3.12, ubuntu-latest)
- GitHub Check: pytest (3.10, ubuntu-latest)
- GitHub Check: pytest (3.13, ubuntu-latest)
- GitHub Check: benchmark
🔇 Additional comments (1)
zntrack/mcp/server.py (1)
161-168: Good error handling implementation.This function properly handles subprocess failures and returns appropriate error messages, which is a good pattern that should be applied to other functions in this file.
| fs = dvc.api.DVCFileSystem() | ||
| graph = fs.repo.index.graph | ||
|
|
There was a problem hiding this comment.
Add error handling for DVC repository access.
Similar to other functions, this should handle cases where the DVC repository is not accessible.
- # Load DVC graph
- fs = dvc.api.DVCFileSystem()
- graph = fs.repo.index.graph
+ # Load DVC graph
+ try:
+ fs = dvc.api.DVCFileSystem()
+ graph = fs.repo.index.graph
+ except Exception as e:
+ return {"error": f"Failed to access DVC repository: {e}"}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fs = dvc.api.DVCFileSystem() | |
| graph = fs.repo.index.graph | |
| # Load DVC graph | |
| try: | |
| fs = dvc.api.DVCFileSystem() | |
| graph = fs.repo.index.graph | |
| except Exception as e: | |
| return {"error": f"Failed to access DVC repository: {e}"} |
🤖 Prompt for AI Agents
In zntrack/mcp/server.py around lines 101 to 103, the code accesses the DVC
repository without error handling, which can cause unhandled exceptions if the
repository is not accessible. Wrap the DVCFileSystem initialization and repo
graph access in a try-except block to catch exceptions related to repository
access, and handle these errors gracefully by logging an appropriate message or
returning a controlled error response.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #929 +/- ##
=======================================
Coverage 88.49% 88.49%
=======================================
Files 40 40
Lines 2294 2294
=======================================
Hits 2030 2030
Misses 264 264 ☔ View full report in Codecov by Sentry. |
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Actionable comments posted: 8
🧹 Nitpick comments (3)
zntrack/mcp/resources/node_getting_started.md (1)
14-22: Improve readability by varying sentence structure.The documentation is comprehensive and helpful. Consider varying the sentence beginnings to improve readability, as several consecutive lines start with "-
zntrack.".Apply this diff to improve the flow:
Options include - `zntrack.deps()` for dependencies to other nodes. - `zntrack.deps_path` for dependencies to files. - `zntrack.params` for parameters. -- `zntrack.params_path()` for parameter files (yaml / json) -- `zntrack.outs_path()` for output files. -- `zntrack.outs()` for outputs that are not files. -- `zntrack.metrics()` dict output. -- `zntrack.metrics_path()` for metrics files. +- Parameter files (yaml / json) via `zntrack.params_path()`. +- Output files using `zntrack.outs_path()`. +- Generic outputs with `zntrack.outs()` for non-file outputs. +- Dictionary metrics through `zntrack.metrics()`. +- Metrics files via `zntrack.metrics_path()`.zntrack/mcp/server.py (2)
105-106: Address the TODO comment about interface checking correctness.The TODO comment indicates a known issue with the interface checking logic that may produce false positives.
Would you like me to help implement a more robust interface checking mechanism that properly validates Protocol implementations?
145-154: Remove redundant import statement.The
get_registered_nodesfunction is already imported at the module level (line 14).@mcp.tool def available_nodes() -> dict[str, list[str]]: """Get all available ZnTrack nodes in the current environment. The nodes are sorted by their module names. """ - from zntrack.entrypoints import get_registered_nodes - return get_registered_nodes()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
pyproject.toml(4 hunks)zntrack/__init__.py(1 hunks)zntrack/entrypoints.py(1 hunks)zntrack/examples/__init__.py(1 hunks)zntrack/mcp/resources/graph_getting_started.md(1 hunks)zntrack/mcp/resources/graph_with_groups.md(1 hunks)zntrack/mcp/resources/node_getting_started.md(1 hunks)zntrack/mcp/server.py(1 hunks)
✅ Files skipped from review due to trivial changes (2)
- zntrack/mcp/resources/graph_getting_started.md
- zntrack/mcp/resources/graph_with_groups.md
🧰 Additional context used
🪛 Ruff (0.12.2)
zntrack/mcp/server.py
105-105: Line too long (115 > 90)
(E501)
132-132: Line too long (95 > 90)
(E501)
213-213: Line too long (95 > 90)
(E501)
228-228: Line too long (95 > 90)
(E501)
zntrack/__init__.py
5-5: zntrack.entrypoints imported but unused; consider removing, adding to __all__, or using a redundant alias
(F401)
🪛 LanguageTool
zntrack/mcp/resources/node_getting_started.md
[style] ~18-~18: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ...r parameters. - zntrack.params_path() for parameter files (yaml / json) - `zntrac...
(ENGLISH_WORD_REPEAT_BEGINNING_RULE)
🔇 Additional comments (6)
zntrack/examples/__init__.py (1)
60-62: LGTM! Function refactored to align with entry point discovery.The change from returning a list of Node instances to a dictionary keyed by module name aligns perfectly with the new entry point discovery mechanism introduced in
zntrack.entrypoints. This standardized format enables consistent node discovery across the package.zntrack/entrypoints.py (1)
15-41: LGTM! Excellent error handling and robust implementation.The entry point discovery implementation is well-structured with comprehensive error handling at both the discovery and individual entry point levels. The use of
defaultdictfor aggregation and proper exception handling with logging ensures the function continues processing even when individual entry points fail to load.pyproject.toml (4)
3-3: LGTM! Version bump is appropriate.The version increment from 0.8.8 to 0.8.9 is appropriate for the new MCP functionality being added.
11-11: LGTM! Python 3.11 requirement aligns with modern features.Raising the minimum Python requirement to 3.11 is reasonable and allows the use of modern Python features like improved typing syntax used in the codebase.
29-29: LGTM! CLI entry point correctly configured.The new
zntrack-mcpCLI entry point is properly configured to point to the MCP server's run function.
64-68: LGTM! Optional MCP dependencies properly isolated.The optional
mcpdependency group correctly isolates the MCP-related packages, allowing users to install only what they need.
| [tool.uv.sources] | ||
| ipsuite = { path = "../ipsuite", editable = true } | ||
| massband = { path = "../massband", editable = true } | ||
| mlipx = { path = "../mlipx", editable = true } | ||
| cp2k-input-tools = { git = "https://github.com/cp2k/cp2k-input-tools.git" } |
There was a problem hiding this comment.
💡 Verification agent
🧩 Analysis chain
Verify UV sources configuration for production readiness.
The UV sources configuration uses local editable paths and git sources, which suggests this is configured for development. Ensure this configuration is appropriate for the intended deployment environment.
🏁 Script executed:
#!/bin/bash
# Description: Check if the local paths in UV sources exist and verify git repository accessibility
# Expected: Local paths should exist in development environment, git repo should be accessible
echo "Checking local paths:"
ls -la ../ipsuite 2>/dev/null && echo "✓ ipsuite path exists" || echo "✗ ipsuite path missing"
ls -la ../massband 2>/dev/null && echo "✓ massband path exists" || echo "✗ massband path missing"
ls -la ../mlipx 2>/dev/null && echo "✓ mlipx path exists" || echo "✗ mlipx path missing"
echo -e "\nChecking git repository accessibility:"
git ls-remote --heads https://github.com/cp2k/cp2k-input-tools.git >/dev/null 2>&1 && echo "✓ cp2k-input-tools git repo accessible" || echo "✗ cp2k-input-tools git repo not accessible"Length of output: 523
Update UV sources for production readiness
The relative paths for ipsuite, massband, and mlipx do not exist in the sandbox and will break installs outside your local dev setup, whereas the cp2k-input-tools git source is reachable. Please replace those editable local‐path entries with stable package references (for example, a PyPI version or a git URL pinned to a release tag) or move them into a development-only profile.
• File: pyproject.toml, section [tool.uv.sources]
- Remove or comment out
ipsuite = { path = "../ipsuite", editable = true } massband = { path = "../massband", editable = true } mlipx = { path = "../mlipx", editable = true }
- Add versioned or tagged references, e.g.:
ipsuite = { version = ">=1.2.0" } massband = { git = "https://github.com/yourorg/massband.git", tag = "v0.4.1" } mlipx = { version = "^2.3.0" } cp2k-input-tools = { git = "https://github.com/cp2k/cp2k-input-tools.git" }
Ensure this aligns with your release process so that installing from pyproject.toml works in CI, production, and user setups.
🤖 Prompt for AI Agents
In pyproject.toml around lines 134 to 138, the local editable path entries for
ipsuite, massband, and mlipx under [tool.uv.sources] should be replaced with
stable package references to ensure production readiness. Remove or comment out
the lines using local paths and instead specify version constraints or git URLs
pinned to release tags for these packages. This change will make the
installation work reliably in CI, production, and user environments.
| import sys | ||
|
|
||
| from zntrack import config | ||
| from zntrack import config, entrypoints |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Add entrypoints to __all__ or remove unused import.
The entrypoints module is imported but not used within this file and not exported in __all__. Since this is a public API module, either add it to __all__ for public export or remove the import if it's not intended to be part of the public API.
Apply this diff to add it to the public API:
__all__ = [
"params",
"deps",
"outs",
"plots",
"metrics",
"params_path",
"deps_path",
"outs_path",
"plots_path",
"metrics_path",
"Node",
"Project",
"nwd",
"from_rev",
"apply",
"add",
"field",
"FieldTypes",
"NOT_AVAILABLE",
"config",
+ "entrypoints",
]Or remove the unused import:
-from zntrack import config, entrypoints
+from zntrack import config📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| from zntrack import config, entrypoints | |
| __all__ = [ | |
| "params", | |
| "deps", | |
| "outs", | |
| "plots", | |
| "metrics", | |
| "params_path", | |
| "deps_path", | |
| "outs_path", | |
| "plots_path", | |
| "metrics_path", | |
| "Node", | |
| "Project", | |
| "nwd", | |
| "from_rev", | |
| "apply", | |
| "add", | |
| "field", | |
| "FieldTypes", | |
| "NOT_AVAILABLE", | |
| "config", | |
| "entrypoints", | |
| ] |
| from zntrack import config, entrypoints | |
| -from zntrack import config, entrypoints | |
| +from zntrack import config |
🧰 Tools
🪛 Ruff (0.12.2)
5-5: zntrack.entrypoints imported but unused; consider removing, adding to __all__, or using a redundant alias
(F401)
🤖 Prompt for AI Agents
In zntrack/__init__.py at line 5, the module 'entrypoints' is imported but not
used or exported in __all__. To fix this, either add 'entrypoints' to the
__all__ list to make it part of the public API or remove the import statement if
it is not intended for public use.
| def analyze_node(path: str, name: str) -> dict: | ||
| module = importlib.import_module(path) | ||
| node_class = getattr(module, name) | ||
| docs = node_class.__doc__ | ||
|
|
||
| fields_by_type = defaultdict(list) | ||
| type_hints = get_type_hints(node_class) | ||
|
|
||
| # Collect dataclass field names | ||
| dataclass_field_names = {f.name for f in dataclasses.fields(node_class)} | ||
| for field in dataclasses.fields(node_class): | ||
| field_type = field.metadata.get(FIELD_TYPE) | ||
| if field_type is not None: | ||
| hint = type_hints.get(field.name, None) | ||
| fields_by_type[field_type].append((field.name, repr(hint))) | ||
|
|
||
| # Collect other members | ||
| methods = { | ||
| "instance_methods": {}, | ||
| "class_methods": {}, | ||
| "static_methods": {}, | ||
| "properties": {}, | ||
| } | ||
|
|
||
| for attr_name, attr in inspect.getmembers(node_class): | ||
| if attr_name.startswith("__") or attr_name in dataclass_field_names: | ||
| continue | ||
|
|
||
| # Properties | ||
| if isinstance(attr, property): | ||
| return_type = get_type_hints(attr.fget).get("return", None) | ||
| methods["properties"][attr_name] = repr(return_type) | ||
|
|
||
| # Class methods | ||
| elif inspect.ismethod(attr) and getattr(attr, "__self__", None) is node_class: | ||
| return_type = get_type_hints(attr).get("return", None) | ||
| methods["class_methods"][attr_name] = repr(return_type) | ||
|
|
||
| # Static methods | ||
| elif isinstance(inspect.getattr_static(node_class, attr_name), staticmethod): | ||
| return_type = get_type_hints(attr).get("return", None) | ||
| methods["static_methods"][attr_name] = repr(return_type) | ||
|
|
||
| # Instance methods | ||
| elif inspect.isfunction(attr): | ||
| return_type = get_type_hints(attr).get("return", None) | ||
| methods["instance_methods"][attr_name] = repr(return_type) | ||
|
|
||
| return { | ||
| "docs": docs, | ||
| "name": name, | ||
| "module": path, | ||
| "fields": fields_by_type, | ||
| "methods": methods, | ||
| } | ||
|
|
There was a problem hiding this comment.
Add error handling for module import and type hint extraction.
The function lacks error handling for several operations that can fail:
- Module import and class retrieval (lines 20-21)
- Type hints extraction (line 25) which can raise exceptions for certain types
- Property fget access (line 49) which might be None
Apply this diff to add comprehensive error handling:
def analyze_node(path: str, name: str) -> dict:
- module = importlib.import_module(path)
- node_class = getattr(module, name)
+ try:
+ module = importlib.import_module(path)
+ node_class = getattr(module, name)
+ except (ImportError, AttributeError) as e:
+ return {"error": f"Failed to load {path}.{name}: {e}"}
+
docs = node_class.__doc__
fields_by_type = defaultdict(list)
- type_hints = get_type_hints(node_class)
+ try:
+ type_hints = get_type_hints(node_class)
+ except Exception:
+ type_hints = {}
# Collect dataclass field names
dataclass_field_names = {f.name for f in dataclasses.fields(node_class)}
for field in dataclasses.fields(node_class):
field_type = field.metadata.get(FIELD_TYPE)
if field_type is not None:
hint = type_hints.get(field.name, None)
fields_by_type[field_type].append((field.name, repr(hint)))
# Collect other members
methods = {
"instance_methods": {},
"class_methods": {},
"static_methods": {},
"properties": {},
}
for attr_name, attr in inspect.getmembers(node_class):
if attr_name.startswith("__") or attr_name in dataclass_field_names:
continue
# Properties
if isinstance(attr, property):
- return_type = get_type_hints(attr.fget).get("return", None)
+ if attr.fget:
+ try:
+ return_type = get_type_hints(attr.fget).get("return", None)
+ except Exception:
+ return_type = None
+ else:
+ return_type = None
methods["properties"][attr_name] = repr(return_type)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def analyze_node(path: str, name: str) -> dict: | |
| module = importlib.import_module(path) | |
| node_class = getattr(module, name) | |
| docs = node_class.__doc__ | |
| fields_by_type = defaultdict(list) | |
| type_hints = get_type_hints(node_class) | |
| # Collect dataclass field names | |
| dataclass_field_names = {f.name for f in dataclasses.fields(node_class)} | |
| for field in dataclasses.fields(node_class): | |
| field_type = field.metadata.get(FIELD_TYPE) | |
| if field_type is not None: | |
| hint = type_hints.get(field.name, None) | |
| fields_by_type[field_type].append((field.name, repr(hint))) | |
| # Collect other members | |
| methods = { | |
| "instance_methods": {}, | |
| "class_methods": {}, | |
| "static_methods": {}, | |
| "properties": {}, | |
| } | |
| for attr_name, attr in inspect.getmembers(node_class): | |
| if attr_name.startswith("__") or attr_name in dataclass_field_names: | |
| continue | |
| # Properties | |
| if isinstance(attr, property): | |
| return_type = get_type_hints(attr.fget).get("return", None) | |
| methods["properties"][attr_name] = repr(return_type) | |
| # Class methods | |
| elif inspect.ismethod(attr) and getattr(attr, "__self__", None) is node_class: | |
| return_type = get_type_hints(attr).get("return", None) | |
| methods["class_methods"][attr_name] = repr(return_type) | |
| # Static methods | |
| elif isinstance(inspect.getattr_static(node_class, attr_name), staticmethod): | |
| return_type = get_type_hints(attr).get("return", None) | |
| methods["static_methods"][attr_name] = repr(return_type) | |
| # Instance methods | |
| elif inspect.isfunction(attr): | |
| return_type = get_type_hints(attr).get("return", None) | |
| methods["instance_methods"][attr_name] = repr(return_type) | |
| return { | |
| "docs": docs, | |
| "name": name, | |
| "module": path, | |
| "fields": fields_by_type, | |
| "methods": methods, | |
| } | |
| def analyze_node(path: str, name: str) -> dict: | |
| try: | |
| module = importlib.import_module(path) | |
| node_class = getattr(module, name) | |
| except (ImportError, AttributeError) as e: | |
| return {"error": f"Failed to load {path}.{name}: {e}"} | |
| docs = node_class.__doc__ | |
| fields_by_type = defaultdict(list) | |
| try: | |
| type_hints = get_type_hints(node_class) | |
| except Exception: | |
| type_hints = {} | |
| # Collect dataclass field names | |
| dataclass_field_names = {f.name for f in dataclasses.fields(node_class)} | |
| for field in dataclasses.fields(node_class): | |
| field_type = field.metadata.get(FIELD_TYPE) | |
| if field_type is not None: | |
| hint = type_hints.get(field.name, None) | |
| fields_by_type[field_type].append((field.name, repr(hint))) | |
| # Collect other members | |
| methods = { | |
| "instance_methods": {}, | |
| "class_methods": {}, | |
| "static_methods": {}, | |
| "properties": {}, | |
| } | |
| for attr_name, attr in inspect.getmembers(node_class): | |
| if attr_name.startswith("__") or attr_name in dataclass_field_names: | |
| continue | |
| # Properties | |
| if isinstance(attr, property): | |
| if attr.fget: | |
| try: | |
| return_type = get_type_hints(attr.fget).get("return", None) | |
| except Exception: | |
| return_type = None | |
| else: | |
| return_type = None | |
| methods["properties"][attr_name] = repr(return_type) | |
| # Class methods | |
| elif inspect.ismethod(attr) and getattr(attr, "__self__", None) is node_class: | |
| return_type = get_type_hints(attr).get("return", None) | |
| methods["class_methods"][attr_name] = repr(return_type) | |
| # Static methods | |
| elif isinstance(inspect.getattr_static(node_class, attr_name), staticmethod): | |
| return_type = get_type_hints(attr).get("return", None) | |
| methods["static_methods"][attr_name] = repr(return_type) | |
| # Instance methods | |
| elif inspect.isfunction(attr): | |
| return_type = get_type_hints(attr).get("return", None) | |
| methods["instance_methods"][attr_name] = repr(return_type) | |
| return { | |
| "docs": docs, | |
| "name": name, | |
| "module": path, | |
| "fields": fields_by_type, | |
| "methods": methods, | |
| } |
🤖 Prompt for AI Agents
In zntrack/mcp/server.py from lines 19 to 74, add try-except blocks around the
module import and class retrieval to catch ImportError and AttributeError,
returning an appropriate error or empty result. Wrap the get_type_hints calls in
try-except to handle exceptions during type hint extraction, defaulting to empty
dict or None if errors occur. Also, before accessing attr.fget for properties,
check if fget is not None to avoid attribute errors. This will ensure the
function gracefully handles failures in dynamic imports and introspection.
| @mcp.resource("docs://graph/getting-started") | ||
| def graph_getting_started() -> str: | ||
| """Get documentation on how to build a graph.""" | ||
| file = Path(__file__).parent / "resources" / "graph_getting_started.md" | ||
| return file.read_text() | ||
|
|
||
|
|
||
| @mcp.resource("docs://graph/with-groups") | ||
| def graph_with_groups() -> str: | ||
| """Get documentation on how to build a graph with groups.""" | ||
| file = Path(__file__).parent / "resources" / "graph_with_groups.md" | ||
| return file.read_text() | ||
|
|
||
|
|
||
| @mcp.resource("docs://node/getting-started") | ||
| def node_getting_started() -> str: | ||
| """Get documentation on how to use ZnTrack nodes.""" | ||
| file = Path(__file__).parent / "resources" / "node_getting_started.md" | ||
| return file.read_text() | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
Add error handling for file reading operations.
The resource functions don't handle potential file reading errors.
Add error handling for file operations:
@mcp.resource("docs://graph/getting-started")
def graph_getting_started() -> str:
"""Get documentation on how to build a graph."""
file = Path(__file__).parent / "resources" / "graph_getting_started.md"
- return file.read_text()
+ try:
+ return file.read_text()
+ except FileNotFoundError:
+ return "Documentation file not found: graph_getting_started.md"
+ except Exception as e:
+ return f"Error reading documentation: {e}"Apply similar changes to graph_with_groups() and node_getting_started() functions.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @mcp.resource("docs://graph/getting-started") | |
| def graph_getting_started() -> str: | |
| """Get documentation on how to build a graph.""" | |
| file = Path(__file__).parent / "resources" / "graph_getting_started.md" | |
| return file.read_text() | |
| @mcp.resource("docs://graph/with-groups") | |
| def graph_with_groups() -> str: | |
| """Get documentation on how to build a graph with groups.""" | |
| file = Path(__file__).parent / "resources" / "graph_with_groups.md" | |
| return file.read_text() | |
| @mcp.resource("docs://node/getting-started") | |
| def node_getting_started() -> str: | |
| """Get documentation on how to use ZnTrack nodes.""" | |
| file = Path(__file__).parent / "resources" / "node_getting_started.md" | |
| return file.read_text() | |
| @mcp.resource("docs://graph/getting-started") | |
| def graph_getting_started() -> str: | |
| """Get documentation on how to build a graph.""" | |
| file = Path(__file__).parent / "resources" / "graph_getting_started.md" | |
| try: | |
| return file.read_text() | |
| except FileNotFoundError: | |
| return "Documentation file not found: graph_getting_started.md" | |
| except Exception as e: | |
| return f"Error reading documentation: {e}" |
🤖 Prompt for AI Agents
In zntrack/mcp/server.py around lines 184 to 203, the functions reading markdown
files do not handle potential file reading errors, which can cause unhandled
exceptions if files are missing or inaccessible. Add try-except blocks around
the file.read_text() calls in graph_getting_started(), graph_with_groups(), and
node_getting_started() to catch exceptions like FileNotFoundError or IOError,
and handle them gracefully, for example by returning an error message or logging
the issue.
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (9)
zntrack/mcp/server.py (9)
18-22: Add error handling for file reading operations.The function doesn't handle potential file reading errors, which could cause unhandled exceptions if the file is missing or inaccessible.
Apply this diff to add error handling:
@mcp.tool def get_graph_getting_started() -> str: """Get documentation on how to build a graph.""" file = Path(__file__).parent / "resources" / "graph_getting_started.md" - return file.read_text() + try: + return file.read_text() + except FileNotFoundError: + return "Documentation file not found: graph_getting_started.md" + except Exception as e: + return f"Error reading documentation: {e}"
25-29: Add error handling for file reading operations.The function doesn't handle potential file reading errors.
Apply this diff to add error handling:
@mcp.tool def get_graph_with_groups() -> str: """Get documentation on how to build a graph with groups.""" file = Path(__file__).parent / "resources" / "graph_with_groups.md" - return file.read_text() + try: + return file.read_text() + except FileNotFoundError: + return "Documentation file not found: graph_with_groups.md" + except Exception as e: + return f"Error reading documentation: {e}"
32-36: Add error handling for file reading operations.The function doesn't handle potential file reading errors.
Apply this diff to add error handling:
@mcp.tool def get_node_getting_started() -> str: """Get documentation on how to use ZnTrack nodes.""" file = Path(__file__).parent / "resources" / "node_getting_started.md" - return file.read_text() + try: + return file.read_text() + except FileNotFoundError: + return "Documentation file not found: node_getting_started.md" + except Exception as e: + return f"Error reading documentation: {e}"
104-108: Add error handling for module import and class retrieval.The function properly handles import and attribute errors but lacks additional error handling for other operations that can fail.
118-126: Add error handling for type hints extraction.The
get_type_hintscall can raise exceptions for certain types and should be wrapped in a try-except block.Apply this diff to add error handling:
- type_hints = get_type_hints(node_class) + try: + type_hints = get_type_hints(node_class) + except Exception: + type_hints = {}
141-143: Add error handling for property type hint extraction.The function should check if
attr.fgetis not None before accessing it and handle potential type hint extraction failures.Apply this diff to add error handling:
# Properties if isinstance(attr, property): - return_type = get_type_hints(attr.fget).get("return", None) + if attr.fget: + try: + return_type = get_type_hints(attr.fget).get("return", None) + except Exception: + return_type = None + else: + return_type = None methods["properties"][attr_name] = repr(return_type)
146-148: Add error handling for method type hint extraction.Type hint extraction can fail and should be wrapped in try-except blocks.
Apply this diff:
# Class methods elif inspect.ismethod(attr) and getattr(attr, "__self__", None) is node_class: - return_type = get_type_hints(attr).get("return", None) + try: + return_type = get_type_hints(attr).get("return", None) + except Exception: + return_type = None methods["class_methods"][attr_name] = repr(return_type)
151-153: Add error handling for static method type hint extraction.Type hint extraction can fail and should be wrapped in try-except blocks.
Apply this diff:
# Static methods elif isinstance(inspect.getattr_static(node_class, attr_name), staticmethod): - return_type = get_type_hints(attr).get("return", None) + try: + return_type = get_type_hints(attr).get("return", None) + except Exception: + return_type = None methods["static_methods"][attr_name] = repr(return_type)
156-158: Add error handling for function type hint extraction.Type hint extraction can fail and should be wrapped in try-except blocks.
Apply this diff:
# Instance methods elif inspect.isfunction(attr): - return_type = get_type_hints(attr).get("return", None) + try: + return_type = get_type_hints(attr).get("return", None) + except Exception: + return_type = None methods["instance_methods"][attr_name] = repr(return_type)
🧹 Nitpick comments (1)
zntrack/mcp/server.py (1)
169-172: Consider fixing line length for better readability.The long string on Line 172 exceeds the 90-character line limit.
Apply this diff to improve readability:
@mcp.tool def get_docs_diffusion() -> str: """Additional information when computing self diffusion.""" - return "It is important to run the simulation long enough. For water, at least 1 ns, for more viscous liquids, up to 50 ns or more " + return ( + "It is important to run the simulation long enough. For water, at least 1 ns, " + "for more viscous liquids, up to 50 ns or more." + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
pyproject.toml(4 hunks)zntrack/entrypoints.py(1 hunks)zntrack/mcp/server.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- zntrack/entrypoints.py
- pyproject.toml
🧰 Additional context used
🧬 Code Graph Analysis (1)
zntrack/mcp/server.py (3)
zntrack/entrypoints.py (1)
get_registered_nodes(15-42)zntrack/examples/__init__.py (1)
nodes(60-62)zntrack/fields/base.py (1)
field(23-104)
🪛 Ruff (0.12.2)
zntrack/mcp/server.py
172-172: Line too long (136 > 90)
(E501)
181-181: Line too long (108 > 90)
(E501)
182-182: Line too long (100 > 90)
(E501)
185-185: Line too long (97 > 90)
(E501)
255-255: Line too long (92 > 90)
(E501)
266-266: Line too long (117 > 90)
(E501)
293-293: Line too long (97 > 90)
(E501)
374-374: Line too long (97 > 90)
(E501)
389-389: Line too long (97 > 90)
(E501)
405-405: Line too long (92 > 90)
(E501)
417-417: Line too long (92 > 90)
(E501)
🔇 Additional comments (5)
zntrack/mcp/server.py (5)
39-52: LGTM! Good error handling implementation.The function properly handles the
PackageNotFoundErrorexception and provides meaningful fallback information when package metadata is unavailable.
55-61: LGTM! Clean error handling and sorting.The function correctly checks for package existence and returns a sorted list of nodes with clear error messaging.
64-70: LGTM! Consistent interface discovery pattern.The function follows the same pattern as
get_node_listbut correctly uses the "zntrack.interfaces" group for interface discovery.
73-88: LGTM! Robust error handling for interface introspection.The function properly validates interface existence and handles potential import/inspection failures with clear error messages.
449-450: LGTM! Clean main guard implementation.The main guard properly runs the MCP server when the module is executed as a script.
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
zntrack/mcp/server.py (1)
19-23: Harden file reads for docs: handle missing files and IO errorsThese tools will raise if the markdown files are missing or unreadable. Add defensive IO handling and explicit UTF-8 to avoid runtime crashes. This mirrors a prior review you received for similar resource reads.
@mcp.tool def get_graph_getting_started() -> str: """Get documentation on how to build a graph.""" file = Path(__file__).parent / "resources" / "graph_getting_started.md" - return file.read_text() + try: + return file.read_text(encoding="utf-8") + except FileNotFoundError: + return "Documentation file not found: graph_getting_started.md" + except Exception as e: + return f"Error reading documentation: {e}" @mcp.tool def get_graph_with_groups() -> str: """Get documentation on how to build a graph with groups.""" file = Path(__file__).parent / "resources" / "graph_with_groups.md" - return file.read_text() + try: + return file.read_text(encoding="utf-8") + except FileNotFoundError: + return "Documentation file not found: graph_with_groups.md" + except Exception as e: + return f"Error reading documentation: {e}" @mcp.tool def get_node_getting_started() -> str: """Get documentation on how to use ZnTrack nodes.""" file = Path(__file__).parent / "resources" / "node_getting_started.md" - return file.read_text() + try: + return file.read_text(encoding="utf-8") + except FileNotFoundError: + return "Documentation file not found: node_getting_started.md" + except Exception as e: + return f"Error reading documentation: {e}"Also applies to: 26-30, 33-37
🧹 Nitpick comments (3)
zntrack/mcp/server.py (3)
55-62: Unify error reporting for list-returning toolsReturning a list containing an error string violates the declared return type and complicates clients. Prefer an error dict or an empty list with a separate status. If changing the signature is too broad for this PR, at least return an empty list on "not found".
def get_node_list(package: str) -> list[str]: """Get all nodes provided by a specific package.""" nodes = get_registered_nodes("zntrack.nodes") if package not in nodes: - return [f"Package '{package}' not found."] + return [] return sorted(nodes[package]) def get_interfaces_list(package: str) -> list[str]: """Get interfaces that define how nodes can interact with each other.""" interfaces = get_registered_nodes(group="zntrack.interfaces") if package not in interfaces: - return [f"Package '{package}' not found."] + return [] return sorted(interfaces[package])If you want stronger typing and clearer semantics, I can follow up with a separate change to return a union type or a structured payload like {"items": [...], "error": "..."}.
Also applies to: 65-71
170-173: Fix Ruff E501: wrap overlong string literalLine length exceeds 90 chars. Wrap the string for lint compliance without changing behavior.
@mcp.tool def get_docs_diffusion() -> str: """Additional information when computing self diffusion.""" - return "It is important to run the simulation long enough. For water, at least 1 ns, for more viscous liquids, up to 50 ns or more " + return ( + "It is important to run the simulation long enough. For water, at least 1 ns, " + "for more viscous liquids, up to 50 ns or more" + )
175-187: Clean up commented code or silence linters for long linesRuff flags E501 on several commented lines. Either remove stale commented blocks, reflow the comments, or add per-line noqa to keep CI green. Recommendation: delete or rewrap to avoid accruing dead code.
Also applies to: 255-255, 266-266, 293-293, 374-374, 389-389, 405-405, 417-417
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
pyproject.toml(4 hunks)zntrack/entrypoints.py(1 hunks)zntrack/mcp/server.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- zntrack/entrypoints.py
- pyproject.toml
🧰 Additional context used
🪛 Ruff (0.12.2)
zntrack/mcp/server.py
172-172: Line too long (136 > 90)
(E501)
181-181: Line too long (108 > 90)
(E501)
182-182: Line too long (100 > 90)
(E501)
185-185: Line too long (97 > 90)
(E501)
255-255: Line too long (92 > 90)
(E501)
266-266: Line too long (117 > 90)
(E501)
293-293: Line too long (97 > 90)
(E501)
374-374: Line too long (97 > 90)
(E501)
389-389: Line too long (97 > 90)
(E501)
405-405: Line too long (92 > 90)
(E501)
417-417: Line too long (92 > 90)
(E501)
🔇 Additional comments (1)
zntrack/mcp/server.py (1)
39-52: Overall: nice, concise MCP surfaceThe top-level inventory tools and docs exposure are straightforward and cohesive. The API surface reads well.
| for package, _ in nodes.items(): | ||
| try: | ||
| metadata = importlib.metadata.metadata(package) | ||
| package_info[package] = metadata.get("Summary", "No description available.") | ||
| except importlib.metadata.PackageNotFoundError: | ||
| package_info[package] = "Package not found." | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
Improve package metadata lookup for module-like keys
get_registered_nodes keys are often module paths (e.g., "pkg.subpkg"); importlib.metadata expects distribution names. Add a fallback via top-level package and packages_distributions() to reduce false “Package not found.” results.
- for package, _ in nodes.items():
- try:
- metadata = importlib.metadata.metadata(package)
- package_info[package] = metadata.get("Summary", "No description available.")
- except importlib.metadata.PackageNotFoundError:
- package_info[package] = "Package not found."
+ for pkg, _ in nodes.items():
+ try:
+ md = importlib.metadata.metadata(pkg)
+ package_info[pkg] = md.get("Summary", "No description available.")
+ continue
+ except importlib.metadata.PackageNotFoundError:
+ pass
+
+ # Fallback 1: try top-level package
+ top_pkg = pkg.split(".", 1)[0]
+ try:
+ md = importlib.metadata.metadata(top_pkg)
+ package_info[pkg] = md.get("Summary", "No description available.")
+ continue
+ except importlib.metadata.PackageNotFoundError:
+ pass
+
+ # Fallback 2: resolve distribution from package name
+ try:
+ mapping = importlib.metadata.packages_distributions()
+ dists = mapping.get(top_pkg, [])
+ for dist in dists:
+ try:
+ md = importlib.metadata.metadata(dist)
+ package_info[pkg] = md.get("Summary", "No description available.")
+ break
+ except importlib.metadata.PackageNotFoundError:
+ continue
+ else:
+ package_info[pkg] = "Package not found."
+ except Exception:
+ package_info[pkg] = "Package not found."📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for package, _ in nodes.items(): | |
| try: | |
| metadata = importlib.metadata.metadata(package) | |
| package_info[package] = metadata.get("Summary", "No description available.") | |
| except importlib.metadata.PackageNotFoundError: | |
| package_info[package] = "Package not found." | |
| for pkg, _ in nodes.items(): | |
| try: | |
| md = importlib.metadata.metadata(pkg) | |
| package_info[pkg] = md.get("Summary", "No description available.") | |
| continue | |
| except importlib.metadata.PackageNotFoundError: | |
| pass | |
| # Fallback 1: try top-level package | |
| top_pkg = pkg.split(".", 1)[0] | |
| try: | |
| md = importlib.metadata.metadata(top_pkg) | |
| package_info[pkg] = md.get("Summary", "No description available.") | |
| continue | |
| except importlib.metadata.PackageNotFoundError: | |
| pass | |
| # Fallback 2: resolve distribution from package name | |
| try: | |
| mapping = importlib.metadata.packages_distributions() | |
| dists = mapping.get(top_pkg, []) | |
| for dist in dists: | |
| try: | |
| md = importlib.metadata.metadata(dist) | |
| package_info[pkg] = md.get("Summary", "No description available.") | |
| break | |
| except importlib.metadata.PackageNotFoundError: | |
| continue | |
| else: | |
| package_info[pkg] = "Package not found." | |
| except Exception: | |
| package_info[pkg] = "Package not found." |
🤖 Prompt for AI Agents
In zntrack/mcp/server.py around lines 45 to 51, the current loop calls
importlib.metadata.metadata(package) where package is often a module path (e.g.,
"pkg.subpkg"), causing false PackageNotFoundError; update lookup to first try
metadata(package), and if that fails, derive the top-level module name (split on
'.') and consult importlib.metadata.packages_distributions() to map that
top-level package to one or more distribution names, then attempt metadata() on
the first matching distribution; handle cases where packages_distributions
returns no distributions and still fall back to a clear "Package not found."
message, ensuring exceptions are caught and only treated as missing metadata.
| type_hints = get_type_hints(node_class) | ||
|
|
||
| # Collect dataclass field names | ||
| dataclass_field_names = {f.name for f in dataclasses.fields(node_class)} | ||
| for field in dataclasses.fields(node_class): | ||
| field_type = field.metadata.get(FIELD_TYPE) | ||
| if field_type is not None: | ||
| hint = type_hints.get(field.name, None) | ||
| fields_by_type[field_type].append((field.name, repr(hint))) | ||
|
|
There was a problem hiding this comment.
Protect against get_type_hints/dataclass failures
get_type_hints can raise (e.g., unresolved forward refs), and dataclasses.fields raises on non-dataclasses. These will currently crash the tool.
- type_hints = get_type_hints(node_class)
+ try:
+ type_hints = get_type_hints(node_class)
+ except Exception:
+ type_hints = {}
- # Collect dataclass field names
- dataclass_field_names = {f.name for f in dataclasses.fields(node_class)}
- for field in dataclasses.fields(node_class):
- field_type = field.metadata.get(FIELD_TYPE)
- if field_type is not None:
- hint = type_hints.get(field.name, None)
- fields_by_type[field_type].append((field.name, repr(hint)))
+ # Collect dataclass field names (only if it's a dataclass)
+ dataclass_field_names = set()
+ if dataclasses.is_dataclass(node_class):
+ dataclass_field_names = {f.name for f in dataclasses.fields(node_class)}
+ for field in dataclasses.fields(node_class):
+ field_type = field.metadata.get(FIELD_TYPE)
+ if field_type is not None:
+ hint = type_hints.get(field.name, None)
+ fields_by_type[field_type].append((field.name, repr(hint)))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| type_hints = get_type_hints(node_class) | |
| # Collect dataclass field names | |
| dataclass_field_names = {f.name for f in dataclasses.fields(node_class)} | |
| for field in dataclasses.fields(node_class): | |
| field_type = field.metadata.get(FIELD_TYPE) | |
| if field_type is not None: | |
| hint = type_hints.get(field.name, None) | |
| fields_by_type[field_type].append((field.name, repr(hint))) | |
| try: | |
| type_hints = get_type_hints(node_class) | |
| except Exception: | |
| type_hints = {} | |
| # Collect dataclass field names (only if it's a dataclass) | |
| dataclass_field_names = set() | |
| if dataclasses.is_dataclass(node_class): | |
| dataclass_field_names = {f.name for f in dataclasses.fields(node_class)} | |
| for field in dataclasses.fields(node_class): | |
| field_type = field.metadata.get(FIELD_TYPE) | |
| if field_type is not None: | |
| hint = type_hints.get(field.name, None) | |
| fields_by_type[field_type].append((field.name, repr(hint))) |
🤖 Prompt for AI Agents
In zntrack/mcp/server.py around lines 118-127, protect calls to get_type_hints
and dataclasses.fields: first check dataclasses.is_dataclass(node_class) and
skip processing if False; wrap get_type_hints(node_class) in a try/except and
fall back to an empty dict on failure (optionally logging the exception at debug
level); also wrap dataclasses.fields(node_class) access in a try/except (or rely
on the is_dataclass guard) to avoid crashing on non-dataclasses and continue
gracefully. Ensure downstream code uses the safe empty type_hints and skips
field processing when dataclasses.fields cannot be obtained.
| if isinstance(attr, property): | ||
| return_type = get_type_hints(attr.fget).get("return", None) | ||
| methods["properties"][attr_name] = repr(return_type) | ||
|
|
||
| # Class methods | ||
| elif inspect.ismethod(attr) and getattr(attr, "__self__", None) is node_class: | ||
| return_type = get_type_hints(attr).get("return", None) | ||
| methods["class_methods"][attr_name] = repr(return_type) | ||
|
|
||
| # Static methods | ||
| elif isinstance(inspect.getattr_static(node_class, attr_name), staticmethod): | ||
| return_type = get_type_hints(attr).get("return", None) | ||
| methods["static_methods"][attr_name] = repr(return_type) | ||
|
|
||
| # Instance methods | ||
| elif inspect.isfunction(attr): | ||
| return_type = get_type_hints(attr).get("return", None) | ||
| methods["instance_methods"][attr_name] = repr(return_type) | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
Harden method/property introspection; avoid AttributeError and type-hints crashes
- property: fget may be None and get_type_hints can raise
- classmethod: detection is more robust via getattr_static(..., classmethod)
- static/instance: get_type_hints can raise
These can currently raise and break the tool.
- if isinstance(attr, property):
- return_type = get_type_hints(attr.fget).get("return", None)
- methods["properties"][attr_name] = repr(return_type)
+ if isinstance(attr, property):
+ if attr.fget is not None:
+ try:
+ return_type = get_type_hints(attr.fget).get("return", None)
+ except Exception:
+ return_type = None
+ else:
+ return_type = None
+ methods["properties"][attr_name] = repr(return_type)
- # Class methods
- elif inspect.ismethod(attr) and getattr(attr, "__self__", None) is node_class:
- return_type = get_type_hints(attr).get("return", None)
- methods["class_methods"][attr_name] = repr(return_type)
+ # Class methods
+ elif isinstance(inspect.getattr_static(node_class, attr_name), classmethod):
+ try:
+ return_type = get_type_hints(attr).get("return", None)
+ except Exception:
+ return_type = None
+ methods["class_methods"][attr_name] = repr(return_type)
# Static methods
elif isinstance(inspect.getattr_static(node_class, attr_name), staticmethod):
- return_type = get_type_hints(attr).get("return", None)
+ try:
+ return_type = get_type_hints(attr).get("return", None)
+ except Exception:
+ return_type = None
methods["static_methods"][attr_name] = repr(return_type)
# Instance methods
elif inspect.isfunction(attr):
- return_type = get_type_hints(attr).get("return", None)
+ try:
+ return_type = get_type_hints(attr).get("return", None)
+ except Exception:
+ return_type = None
methods["instance_methods"][attr_name] = repr(return_type)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if isinstance(attr, property): | |
| return_type = get_type_hints(attr.fget).get("return", None) | |
| methods["properties"][attr_name] = repr(return_type) | |
| # Class methods | |
| elif inspect.ismethod(attr) and getattr(attr, "__self__", None) is node_class: | |
| return_type = get_type_hints(attr).get("return", None) | |
| methods["class_methods"][attr_name] = repr(return_type) | |
| # Static methods | |
| elif isinstance(inspect.getattr_static(node_class, attr_name), staticmethod): | |
| return_type = get_type_hints(attr).get("return", None) | |
| methods["static_methods"][attr_name] = repr(return_type) | |
| # Instance methods | |
| elif inspect.isfunction(attr): | |
| return_type = get_type_hints(attr).get("return", None) | |
| methods["instance_methods"][attr_name] = repr(return_type) | |
| if isinstance(attr, property): | |
| if attr.fget is not None: | |
| try: | |
| return_type = get_type_hints(attr.fget).get("return", None) | |
| except Exception: | |
| return_type = None | |
| else: | |
| return_type = None | |
| methods["properties"][attr_name] = repr(return_type) | |
| # Class methods | |
| elif isinstance(inspect.getattr_static(node_class, attr_name), classmethod): | |
| try: | |
| return_type = get_type_hints(attr).get("return", None) | |
| except Exception: | |
| return_type = None | |
| methods["class_methods"][attr_name] = repr(return_type) | |
| # Static methods | |
| elif isinstance(inspect.getattr_static(node_class, attr_name), staticmethod): | |
| try: | |
| return_type = get_type_hints(attr).get("return", None) | |
| except Exception: | |
| return_type = None | |
| methods["static_methods"][attr_name] = repr(return_type) | |
| # Instance methods | |
| elif inspect.isfunction(attr): | |
| try: | |
| return_type = get_type_hints(attr).get("return", None) | |
| except Exception: | |
| return_type = None | |
| methods["instance_methods"][attr_name] = repr(return_type) |

Summary by CodeRabbit
New Features
Documentation
Chores