diff --git a/interpreter.py b/interpreter.py old mode 100755 new mode 100644 index a13cfe4..b4fc16b --- a/interpreter.py +++ b/interpreter.py @@ -30,6 +30,18 @@ def build_parser(): + """ + Create and return the command-line ArgumentParser configured for the Code Interpreter CLI. + + The parser includes flags for execution and output control (--exec, --save_code, --display_code), + mode selection (--mode with choices 'code', 'script', 'command', 'vision', 'chat'), model override (--model), + language selection (--lang), history memory (--history), unsafe mode (--unsafe), upgrade workflow (--upgrade), + and optional input file (--file, uses 'prompt.txt' when provided without a value). It also exposes a version + option and enforces mutual exclusion between --cli and --tui UI selection flags. + + Returns: + argparse.ArgumentParser: A fully configured ArgumentParser for the interpreter CLI. + """ parser = argparse.ArgumentParser(description='Code - Interpreter') parser.add_argument('--exec', '-e', action='store_true', default=False, help='Execute the code') parser.add_argument('--save_code', '-s', action='store_true', default=False, help='Save the generated code') @@ -49,10 +61,26 @@ def build_parser(): def _get_default_model(): + """ + Get the default model name used when no model is specified. + + Returns: + str: The default model name to use for code generation. + """ return UtilityManager.get_default_model_name() def prepare_args(args, argv): + """ + Finalize CLI/TUI selection and populate missing defaults on the parsed arguments. + + Parameters: + args (argparse.Namespace): Parsed command-line arguments to finalize; may be modified in-place. + argv (Sequence[str]): Original program argv used to detect whether runtime arguments were provided. + + Returns: + The finalized argument namespace, or the value returned by TerminalUI().launch(args) when TUI mode is launched. + """ no_runtime_args = len(argv) <= 1 if no_runtime_args and not args.cli and not args.tui: args.tui = True @@ -69,6 +97,12 @@ def prepare_args(args, argv): def main(argv=None): + """ + Parse command-line arguments, prepare runtime settings, and start the interpreter process. + + Parameters: + argv (list[str] | None): Optional argument vector to parse; when `None`, `sys.argv` is used. This allows overriding CLI input for testing or embedding. + """ argv = argv or sys.argv parser = build_parser() args = parser.parse_args(argv[1:]) diff --git a/libs/code_interpreter.py b/libs/code_interpreter.py index 6bb20ba..b67b149 100644 --- a/libs/code_interpreter.py +++ b/libs/code_interpreter.py @@ -17,9 +17,27 @@ class CodeInterpreter: def __init__(self): + """ + Initialize the CodeInterpreter instance and configure its logger. + + Sets self.logger to a Logger initialized with the file "logs/code-interpreter.log". + """ self.logger = Logger.initialize("logs/code-interpreter.log") def _get_subprocess_security_kwargs(self, sandbox_context=None): + """ + Builds subprocess keyword arguments applying working directory, environment, and OS-specific process isolation flags. + + Parameters: + sandbox_context (optional): An object that may have `cwd` and `env` attributes; those values (or `None` if absent) are used to populate the corresponding subprocess kwargs. + + Returns: + dict: A mapping suitable for passing to subprocess functions containing: + - `cwd`: the working directory from `sandbox_context.cwd` or `None`. + - `env`: the environment mapping from `sandbox_context.env` or `None`. + - On Windows (`os.name == "nt"`): `creationflags` (int) combining available flags such as `CREATE_NO_WINDOW` and `CREATE_NEW_PROCESS_GROUP`. + - On non-Windows: `start_new_session` set to `True`. + """ kwargs = { "cwd": getattr(sandbox_context, "cwd", None), "env": getattr(sandbox_context, "env", None), @@ -34,6 +52,15 @@ def _get_subprocess_security_kwargs(self, sandbox_context=None): return kwargs def _build_command_invocation(self, command: str): + """ + Constructs a platform-appropriate command invocation list suitable for passing to subprocess functions. + + Parameters: + command (str): The shell command string to execute. + + Returns: + list: A list of program and argument tokens that invoke the given command on the current OS (Windows uses `cmd.exe /d /c`, Linux/macOS prefers `/bin/bash --noprofile --norc -lc` when available, otherwise `sh -c`). + """ if os.name == "nt": return ["cmd.exe", "/d", "/c", command] bash_path = "/bin/bash" if os.path.exists("/bin/bash") else None @@ -42,6 +69,17 @@ def _build_command_invocation(self, command: str): return ["sh", "-c", command] def _execute_script(self, script: str, shell: str, sandbox_context=None): + """ + Execute a script using the specified shell and return its captured output and error text. + + Parameters: + script (str): The script text to execute. + shell (str): The shell to use; expected values are `"bash"`, `"powershell"`, or `"applescript"`. + sandbox_context (optional): An object that may provide `cwd`, `env`, and `timeout_seconds` to control the subprocess environment and timeout. + + Returns: + (tuple): A pair `(stdout, stderr)` where `stdout` is the trimmed standard output string or `None` if no output, and `stderr` is the trimmed standard error string or `None` if no error. On timeout, `stderr` will be `"Execution timed out."`. If an invalid `shell` is provided, returns `(None, "Invalid shell selected: ")`. + """ stdout = stderr = None try: popen_kwargs = { @@ -124,9 +162,19 @@ def save_code(self, filename='output/code_generated.py', code=None): def extract_code(self, code: str, start_sep='```', end_sep='```', skip_first_line=False, code_mode=False): """ - Extracts the code from the provided string. - If the string contains the start and end separators, it extracts the code between them. - Otherwise, it returns the original string. + Extracts a code snippet delimited by the provided start and end separators from a text block. + + If the input contains triple backticks ("```") but the provided separators are single backticks, the function treats the separators as triple backticks. When a matching fenced region is found, the content between the separators is returned with optional adjustments described below; if no matching separators are present, the original `code` string is returned. + + Parameters: + code (str): The input text containing code or plain text. If `None`, the function returns `None`. + start_sep (str): Opening separator that marks the start of the code block (default: "```"). + end_sep (str): Closing separator that marks the end of the code block (default: "```"). + skip_first_line (bool): When True and `code_mode` is True, skip the first line of the fenced block if the opening separator is not immediately followed by a newline. + code_mode (bool): When True, treat the extracted content as code (affects `skip_first_line` behavior). When False, non-code cleanup is applied (see returns). + + Returns: + str or None: The extracted code block (possibly adjusted), the original `code` string if no matching separators are found, or `None` if the input `code` is `None`. """ try: if code is None: @@ -172,6 +220,24 @@ def extract_code(self, code: str, start_sep='```', end_sep='```', skip_first_lin raise Exception(f"Error occurred while extracting code: {exception}") def execute_code(self, code, language, sandbox_context=None): + """ + Execute the provided source code in the specified language and return its captured output and errors. + + Executes `code` using a subprocess for the given `language` and returns the subprocess stdout and stderr as decoded UTF-8 strings. Supports "python" (runs `python -c`) and "javascript" (runs `node -e`). Applies optional sandboxing parameters from `sandbox_context` (cwd, env, and timeout_seconds) to the subprocess invocation. + + Parameters: + code (str): Source code to execute. + language (str): Programming language name (e.g., "python", "javascript"). + sandbox_context (optional): An object that may provide `cwd`, `env`, and `timeout_seconds` to control subprocess execution and timeout. + + Returns: + tuple: `(stdout, stderr)` where each is a decoded UTF-8 string containing the subprocess standard output and standard error. + str: If the provided `code` is empty or only whitespace, returns the message "Code is empty. Cannot execute an empty code." + tuple: `(None, "Execution timed out.")` if the subprocess exceeds the configured timeout. + + Raises: + Exception: If required compilers/interpreters are not found, if the language is unsupported, or on other execution errors. + """ try: language = language.lower() self.logger.info(f"Running code: {code[:100]} in language: {language}") @@ -226,6 +292,20 @@ def execute_code(self, code, language, sandbox_context=None): raise exception def execute_script(self, script:str, os_type:str='macos', sandbox_context=None): + """ + Execute a platform-specific script and return its captured output and error. + + Parameters: + script (str): The script content to run. + os_type (str): Target operating system; recognized values include 'macos', 'linux', and 'windows' (case-insensitive). + sandbox_context (optional): Sandbox configuration object (e.g., providing `cwd`, `env`, and `timeout_seconds`) applied to the subprocess invocation. + + Returns: + tuple: (stdout, stderr) where `stdout` is the script's standard output string or None, and `stderr` is the script's standard error string or None. + + Raises: + ValueError: If `script` or `os_type` is missing, or if `os_type` is not one of 'macos', 'linux', or 'windows'. + """ output = error = None try: if not script: @@ -254,6 +334,26 @@ def execute_script(self, script:str, os_type:str='macos', sandbox_context=None): return output, error def execute_command(self, command:str, sandbox_context=None): + """ + Execute a shell command in a subprocess and return its captured stdout and stderr. + + Parameters: + command (str): The command string to execute; must be provided. + sandbox_context (optional): Optional object that may supply execution parameters: + - cwd: working directory for the subprocess + - env: environment variables mapping for the subprocess + - timeout_seconds: execution timeout in seconds (defaults to 30) + Additionally used to determine OS-specific subprocess kwargs (e.g., creationflags or start_new_session). + + Returns: + tuple: (stdout, stderr) + - stdout (str or None): UTF-8 decoded standard output from the command, or None if execution timed out. + - stderr (str): UTF-8 decoded standard error from the command, or the string "Execution timed out." if the process exceeded the timeout. + + Raises: + ValueError: If `command` is empty or not provided. + Exception: Re-raises any unexpected exceptions encountered during execution. + """ try: if not command: raise ValueError("Command must be provided.") diff --git a/libs/history_manager.py b/libs/history_manager.py index 1a06ff0..0fedba0 100644 --- a/libs/history_manager.py +++ b/libs/history_manager.py @@ -6,6 +6,12 @@ class History: def __init__(self, history_file: str): + """ + Initialize the History instance, set up its logger, and ensure the history file and its parent directory exist (creating them if necessary). If the history file is newly created, write an empty JSON array into it. + + Parameters: + history_file (str): Path to the JSON file used to store history; parent directories will be created if they do not exist. + """ self.history_file = history_file self.logger = Logger.initialize("logs/interpreter.log") history_dir = os.path.dirname(self.history_file) @@ -16,6 +22,21 @@ def __init__(self, history_file: str): json.dump([], history_file) def save_history_json(self, task, mode, os_name, language, prompt, code_snippet, code_output, model_name): + """ + Append a structured history entry to the JSON array stored at the instance's history_file. + + Builds an entry containing assistant metadata (`task`, `mode`, `os`, `language`, `model`), the user `prompt`, and system `code` and `output`, then appends it to the JSON array in the history file. If the file does not exist or is empty, a new JSON array is created containing the entry. On failure the error is logged and the original exception is re-raised. + + Parameters: + task (str): High-level task or intent for the assistant. + mode (str): Mode or context identifier for the session. + os_name (str): Operating system name or target environment. + language (str): Programming or natural language associated with the entry. + prompt (str): User prompt or input text. + code_snippet (str): Code produced or executed in the session. + code_output (str): Output or result produced by the code. + model_name (str): Name of the model used by the assistant. + """ try: history_entry = { "assistant": { @@ -46,7 +67,15 @@ def save_history_json(self, task, mode, os_name, language, prompt, code_snippet, raise def _get_data_for_key(self, key: str) -> List[Any]: - """Returns a list of all values for the specified key in the history data.""" + """ + Collects all values associated with a given key from stored history entries. + + Parameters: + key (str): The key to look up within each history entry (searched first in the entry's 'assistant' object, then in 'system'). + + Returns: + values (List[Any]): A list of values found for `key` across all history entries. Returns an empty list if the history file is missing, empty, or no entries contain the key. + """ try: if not os.path.exists(self.history_file): return [] @@ -69,7 +98,17 @@ def _get_data_for_key(self, key: str) -> List[Any]: raise def _get_last_entries(self, count: int) -> List[dict]: - """Returns the last n entries from the history data.""" + """ + Retrieve the most recent history entries. + + Returns up to `count` of the most recent history records from the history file; returns an empty list if the history file is missing or empty. + + Parameters: + count (int): Maximum number of entries to return. If fewer entries exist, all available entries are returned. + + Returns: + last_entries (List[dict]): A list of history entry dictionaries (up to `count`), ordered from oldest to newest within the returned slice; empty list if no entries are available. + """ try: if not os.path.exists(self.history_file) or os.path.getsize(self.history_file) == 0: return [] @@ -99,6 +138,16 @@ def _get_last_entries_for_key(self, key: str, count: int) -> List[Any]: raise def _get_last_entries_for_keys(self, count: int, *keys: str) -> List[dict]: + """ + Assembles up to `count` session dictionaries where each requested key maps to its corresponding most-recent value or `None`. + + Parameters: + count (int): Maximum number of sessions to return. + *keys (str): One or more history keys to include in each session. + + Returns: + List[dict]: A list of up to `count` dictionaries. Each dictionary maps each requested key to the value at that position in the key's recent-values list or `None` if no value exists for that position. Returns an empty list if none of the requested keys have any entries. + """ last_entries = [] try: entries = {key: self._get_last_entries_for_key(key, count) for key in keys} @@ -118,7 +167,25 @@ def get_chat_history(self, count: int) -> List[dict]: return self._get_last_entries_for_keys(count, "task", "output") def get_code_history(self, count: int) -> List[dict]: + """ + Retrieve the most recent code sessions with their corresponding outputs. + + Parameters: + count (int): Maximum number of recent sessions to return. + + Returns: + List[dict]: A list of up to `count` session dictionaries where each dictionary contains the keys `"code"` and `"output"` mapped to their most recent values; missing values are `None`. + """ return self._get_last_entries_for_keys(count, "code", "output") def get_full_history(self, count: int) -> List[dict]: + """ + Return the most recent sessions containing task, code, and output entries. + + Parameters: + count (int): Maximum number of recent sessions to include. + + Returns: + history (List[dict]): A list with up to `count` session dictionaries. Each session maps the keys `"task"`, `"code"`, and `"output"` to their most recent values (or `None` if a value is missing). + """ return self._get_last_entries_for_keys(count, "task", "code", "output") diff --git a/libs/interpreter_lib.py b/libs/interpreter_lib.py index 0491374..4508feb 100644 --- a/libs/interpreter_lib.py +++ b/libs/interpreter_lib.py @@ -44,6 +44,20 @@ class Interpreter: console = Console() def __init__(self, args): + """ + Initialize the Interpreter instance, configure runtime helpers and flags, and perform setup. + + Parameters: + args (argparse.Namespace): Parsed CLI arguments; expected attributes: + - unsafe (bool, optional): If true, allow execution paths that bypass safety blocking. + - tui (bool, optional): If true, enable the optional terminal UI. + + Behavior: + Creates and assigns core helper objects (utility manager, code interpreter, package manager, + history manager, logger, execution safety manager, optional TerminalUI), initializes + execution and retry limits, stores initial history settings, and invokes instance + initialization via initialize(). + """ self.args = args self.history = [] self.history_count = 3 @@ -66,6 +80,18 @@ def __init__(self, args): self.initialize() def initialize(self): + """ + Initialize interpreter runtime settings and external helpers. + + Configures language, model, mode, prompt source (stdin versus file), display/save/execute flags, and optional history flag; loads the appropriate system message for the active mode (vision/chat or from system/system_message.txt), initializes the model client and mode-specific flags, and attempts to enable readline history. + + Raises: + FileNotFoundError or other IO/error raised when reading `system/system_message.txt` fails. + + Notes: + - Sets instance attributes such as INTERPRETER_LANGUAGE, SAVE_CODE, EXECUTE_CODE, DISPLAY_CODE, INTERPRETER_MODEL, INTERPRETER_MODEL_LABEL, INTERPRETER_MODE, INTERPRETER_PROMPT_FILE, INTERPRETER_PROMPT_INPUT, INTERPRETER_HISTORY, and system_message. + - Calls initialize_client(), initialize_mode(), and utility_manager.initialize_readline_history() (the latter errors are logged but do not propagate). + """ self.INTERPRETER_LANGUAGE = self.args.lang if self.args.lang else 'python' self.SAVE_CODE = self.args.save_code self.EXECUTE_CODE = self.args.exec @@ -118,6 +144,15 @@ def initialize(self): self.logger.error("Exception on initializing readline history") def _is_recoverable_runtime_error(self, error_text): + """ + Detects whether the provided runtime error text signals a recoverable or transient issue. + + Parameters: + error_text (str | None): The error message to inspect; `None` is treated as an empty string. + + Returns: + True if the text contains markers of recoverable/provider/transient errors (e.g., rate limits, quota/credits, authentication/authorization issues, model or resource not found, timeouts, or connection problems), False otherwise. + """ recoverable_errors = [ "rate limit", "ratelimit", @@ -140,6 +175,15 @@ def _is_recoverable_runtime_error(self, error_text): return any(error in error_text for error in recoverable_errors) def _format_runtime_error_message(self, error_text): + """ + Normalize and clean an error message for concise display. + + Parameters: + error_text (str | None): The raw error text to normalize; may be None or empty. + + Returns: + str: A cleaned, single-line error message with URLs and common error prefixes removed and excess whitespace collapsed. Returns "Unknown error" when `error_text` is falsy. + """ message = error_text or "Unknown error" message = re.sub(r"https?://\S+", "", message) message = re.sub(r"litellm\.[A-Za-z]+Error:\s*", "", message) @@ -150,6 +194,17 @@ def _format_runtime_error_message(self, error_text): return message def _is_retryable_request_error(self, error_text): + """ + Determines whether a provider request error message indicates a retryable failure. + + Examines the provided error text for markers that imply either retryable transient issues (rate limits, timeouts, connection problems, HTTP 5xx/429) or non-retryable issues (billing, quota, authentication, missing model). Non-retryable markers take precedence. + + Parameters: + error_text (str): The error message or text to analyze; may be None or empty. + + Returns: + True if the error text suggests a retryable request error, False otherwise. + """ error_text = (error_text or "").lower() retryable_markers = [ "rate limit", @@ -180,6 +235,21 @@ def _is_retryable_request_error(self, error_text): return any(marker in error_text for marker in retryable_markers) def _generate_content_with_retries(self, message, chat_history, config_values=None, image_file=None): + """ + Attempt to generate model content, retrying on transient request errors up to the configured retry limit. + + Parameters: + message (str): User prompt or task sent to the model. + chat_history (list|None): Optional conversation history to include in the request. + config_values (object|None): Optional overrides for model request parameters (e.g., temperature, max_tokens, provider). + image_file (str|None): Optional path to an image file used for vision-mode requests. + + Returns: + (str): The generated model text returned by generate_content. + + Raises: + Exception: Re-raises the last encountered exception when a non-retryable error occurs or all retry attempts are exhausted. + """ last_exception = None for attempt in range(1, self.MAX_LLM_RETRIES + 1): try: @@ -195,12 +265,33 @@ def _generate_content_with_retries(self, message, chat_history, config_values=No raise last_exception def _apply_mode(self, mode): + """ + Set the interpreter mode and update corresponding boolean mode flags. + + Parameters: + mode (str): Mode name to apply (case-insensitive). Expected values include "vision", "script", "command", "code", and "chat". + + Effects: + Sets self.INTERPRETER_MODE to the lowercased mode and updates the boolean attributes + `VISION_MODE`, `SCRIPT_MODE`, `COMMAND_MODE`, `CODE_MODE`, and `CHAT_MODE` so that + only the attribute matching the applied mode is True and the others are False. + """ modes = {'vision': 'VISION_MODE', 'script': 'SCRIPT_MODE', 'command': 'COMMAND_MODE', 'code': 'CODE_MODE', 'chat': 'CHAT_MODE'} self.INTERPRETER_MODE = mode.lower() for key in modes: setattr(self, modes[key], self.INTERPRETER_MODE == key) def _open_tui_settings(self, setting_type): + """ + Open an interactive terminal UI to select or modify a runtime setting. + + Parameters: + setting_type (str): One of "mode", "model", "language", or "settings" indicating which UI selector to open. + + Returns: + dict: A mapping with the selected key and value (e.g., {"mode": "code"}, {"model": "gpt"}), or the result of interactive_settings when `setting_type` is "settings". + None: If no terminal UI is configured or the `setting_type` is unrecognized. + """ if not self.terminal_ui: return None if setting_type == "mode": @@ -214,6 +305,21 @@ def _open_tui_settings(self, setting_type): return None def _apply_runtime_settings(self, settings): + """ + Apply runtime configuration overrides from a settings mapping to the interpreter. + + Updates interpreter mode, language, display/execute/save flags, history flag, and model selection when corresponding keys are present in the provided `settings` dict. If a `model` is specified and its config file is missing, prints a user-facing message; if present, sets the model and reinitializes the client. + + Parameters: + settings (dict): Mapping of runtime settings to apply. Recognized keys: + - "mode": interpreter mode name (e.g., "code", "script", "chat", "vision", "command"); applied via _apply_mode(). + - "language": programming language name to set INTERPRETER_LANGUAGE. + - "display_code": boolean to set DISPLAY_CODE. + - "execute_code": boolean to set EXECUTE_CODE. + - "save_code": boolean to set SAVE_CODE. + - "history": boolean to set INTERPRETER_HISTORY. + - "model": model name string; if a corresponding configs/.config file exists, sets INTERPRETER_MODEL and INTERPRETER_MODEL_LABEL and calls initialize_client(); otherwise prints a guidance message. + """ if not settings: return if "mode" in settings and settings["mode"]: @@ -239,6 +345,13 @@ def _apply_runtime_settings(self, settings): self.initialize_client() def _display_session_banner(self, os_name, input_prompt_mode): + """ + Prints a compact session banner showing OS, language, mode, prompt source, and model label. + + Parameters: + os_name (str): The operating system name to display (e.g., "Windows 10" or "Linux"). + input_prompt_mode (str): Source of prompts; when equal to "input" (case-insensitive) displays "input", otherwise displays "file". + """ short_lang = "python" if self.INTERPRETER_LANGUAGE == "python" else "javascript" short_prompt_mode = "input" if input_prompt_mode.lower() == "input" else "file" short_os_name = os_name.replace("Windows ", "Win") @@ -250,6 +363,20 @@ def _display_session_banner(self, os_name, input_prompt_mode): self.console.print(f"[bold bright_blue]{session_line}[/bold bright_blue]", overflow="ignore", no_wrap=True) def _build_repair_prompt(self, task, prompt, code_snippet, error_text, os_name, code_output=None): + """ + Builds a bounded repair prompt instructing the model to return a single corrected code block for a failed execution. + + Parameters: + task (str): The original user task or intent. + prompt (str): The resolved prompt that was sent to the model. + code_snippet (str): The generated code that failed when executed. + error_text (str): The execution error message observed. + os_name (str): The operating system name to contextualize platform-specific fixes. + code_output (str, optional): Observed stdout produced before the failure, if any. + + Returns: + str: A repair prompt asking the model to privately reason about the failure and return only the corrected code enclosed in one triple-backtick block. + """ if self.COMMAND_MODE: target = "single terminal command" elif self.SCRIPT_MODE: @@ -275,9 +402,28 @@ def _build_repair_prompt(self, task, prompt, code_snippet, error_text, os_name, ) def _task_has_any(self, text, phrases): + """ + Check whether any of the provided substrings appears in the given text. + + Parameters: + text (str): The text to search within. + phrases (Iterable[str]): An iterable of substring phrases to look for. + + Returns: + True if any phrase from `phrases` is found in `text`, False otherwise. + """ return any(phrase in text for phrase in phrases) def _is_simple_directory_listing_task(self, task_lower): + """ + Determine whether a user task is a simple request to list files in the current directory without requesting charts, images, tables, or size information. + + Parameters: + task_lower (str): The user task text normalized to lowercase. + + Returns: + bool: `True` if the task matches common directory-listing phrases and does not contain disallowed keywords (e.g., "chart", "graph", "png", "table", "size"); `False` otherwise. + """ if not task_lower: return False @@ -302,6 +448,22 @@ def _is_simple_directory_listing_task(self, task_lower): return self._task_has_any(task_lower, list_phrases) and not self._task_has_any(task_lower, disallowed) def _maybe_simplify_generated_code(self, task, code_snippet): + """ + Simplify generated code for trivial tasks (exact print, current working directory, simple directory listing) when in code mode. + + When `CODE_MODE` is enabled and both `task` and `code_snippet` are strings, this returns a minimal, language-appropriate code snippet for: + - requests to "print(s) exactly ''" (produces a single print/log statement), + - requests mentioning the "current working directory" (produces a cwd print), + - simple directory-listing requests (produces a short listing loop). + For unsupported languages or when no simplification applies, returns the original `code_snippet`. + + Parameters: + task (str): The user's task/description used to detect simple patterns. + code_snippet (str): The original generated code to potentially simplify. + + Returns: + str: A simplified code snippet tailored to `INTERPRETER_LANGUAGE` when a recognized trivial pattern is found; otherwise the original `code_snippet`. + """ if not self.CODE_MODE or not isinstance(task, str) or not isinstance(code_snippet, str): return code_snippet @@ -329,6 +491,17 @@ def _maybe_simplify_generated_code(self, task, code_snippet): return code_snippet def _execute_generated_output(self, code_snippet, os_name, force_execute=False): + """ + Assess the generated code against the safety policy and, if permitted, execute it inside a sandboxed context. + + Parameters: + code_snippet (str): The code or command block produced by the model to be assessed and potentially executed. + os_name (str): Operating system name used to select the appropriate execution path (e.g., "linux", "windows", "darwin"). + force_execute (bool): If True, bypasses interactive execution approval and forces execution when allowed by safety. + + Returns: + (stdout, stderr) tuple from the execution where `stdout` is the captured standard output (or None) and `stderr` is None on success or an error string on failure. If execution is prevented by the safety policy, returns (None, "Safety blocked: "). + """ decision = self.safety_manager.assess_execution(code_snippet, self.INTERPRETER_MODE) if not self.UNSAFE_EXECUTION and not decision.allowed: reason_text = "; ".join(decision.reasons) @@ -343,6 +516,27 @@ def _execute_generated_output(self, code_snippet, os_name, force_execute=False): self.safety_manager.cleanup_sandbox_context(sandbox_context) def _attempt_repair_after_failure(self, task, prompt, code_snippet, code_error, os_name, start_sep, end_sep, skip_first_line, extracted_file_name, code_output=None): + """ + Attempt to iteratively repair a failing code snippet by requesting corrected code from the model and re-executing it until a successful result, a safety block, or the repair attempt limit is reached. + + Parameters: + task (str): Original user task description driving the generation. + prompt (str): The resolved prompt that was sent to the model. + code_snippet (str): The most recently generated code to attempt to repair. + code_error (str): Error message or stderr produced when the code last executed. + os_name (str): Target operating system name used to tailor repairs and execution. + start_sep (str): Opening fence/marker used when extracting code blocks from model output. + end_sep (str): Closing fence/marker used when extracting code blocks from model output. + skip_first_line (bool): Whether the first line of the extracted fenced block should be skipped when extracting runnable code. + extracted_file_name (str): Path or name of any file referenced in the prompt (used as context for repair generation). + code_output (str | None): Stdout from the previous execution, if any. + + Returns: + tuple: (final_snippet, stdout, error) + - final_snippet (str): The last attempted code snippet (repaired or original). + - stdout (str | None): Captured standard output from the last execution, or None if none produced. + - error (str | None): Error text from the last execution, or None if the last execution succeeded. + """ circuit_breaker = RepairCircuitBreaker(max_attempts=self.MAX_REPAIR_ATTEMPTS) current_snippet = code_snippet current_error = code_error @@ -373,12 +567,31 @@ def _attempt_repair_after_failure(self, task, prompt, code_snippet, code_error, return current_snippet, current_output, current_error def _safe_input(self, prompt_text, default=None): + """ + Prompt the user with prompt_text and return a default value if input is interrupted (EOF). + + Parameters: + prompt_text (str): Text displayed to the user when requesting input. + default (Optional[str]): Value returned when EOF is encountered; defaults to None. + + Returns: + user_input (str or None): The entered string, or `default` if EOF was raised. + """ try: return input(prompt_text) except EOFError: return default def initialize_client(self): + """ + Prepare model configuration and validate required API credentials from the local `.env`. + + Reads the interpreter model configuration, sets the active model name/label, and ensures environment credentials required for the selected provider are present and well-formed. For local models, client initialization is skipped and a default local OpenAI API key is injected into the environment when one is not present. For remote providers, the method determines the expected API key variable and enforces presence and basic prefix/length constraints. + + Raises: + Exception: If the expected provider API key is missing from the environment or `.env`. + Exception: If the found API key does not match the required prefix or fails the minimum length check. + """ env_path = os.path.join(os.getcwd(), ".env") load_dotenv(dotenv_path=env_path, override=True) self.logger.info("Initializing Client") @@ -455,6 +668,11 @@ def initialize_client(self): raise Exception(f"{api_key_name} should have length greater than {api_key_info['length']}. Please check your .env file.") def initialize_mode(self): + """ + Set interpreter mode flags based on self.args.mode. + + Sets CODE_MODE, SCRIPT_MODE, COMMAND_MODE, VISION_MODE, and CHAT_MODE according to the exact string value of self.args.mode ('code', 'script', 'command', 'vision', 'chat'). If none of the non-code modes are selected, ensures CODE_MODE is True by default. + """ self.CODE_MODE = True if self.args.mode == 'code' else False self.SCRIPT_MODE = True if self.args.mode == 'script' else False self.COMMAND_MODE = True if self.args.mode == 'command' else False @@ -464,6 +682,22 @@ def initialize_mode(self): self.CODE_MODE = True def get_prompt(self, message: str, chat_history: List[dict]) -> List[dict] | str: + """ + Construct the prompt payload appropriate for the current interpreter mode and model. + + Builds a provider-specific messages payload (list of role/content dicts) used for LLM requests, or a plain system string for vision mode. The exact messages and strict instructions differ by mode: + - code/script/command: system and assistant messages enforce returning exactly one fenced code block or single command/script with no extra explanation. + - chat: includes a concise system instruction and, when provided, appends chat_history for context. + - vision: returns a single system instruction string (not a messages list). + For models whose identifier contains "claude", returns a single Anthropic-style user message with nested text chunks. + + Parameters: + message (str): The user's current prompt or task. + chat_history (List[dict]): Optional prior conversation entries to include for chat-mode context; expected as a list of role/content dictionaries. + + Returns: + List[dict] | str: A messages list suitable for the target provider APIs, or a system instruction string when in vision mode. + """ system_message: str = "" assistant_message = "Please generate code wrapped inside triple backticks known as codeblock." @@ -526,6 +760,13 @@ def get_prompt(self, message: str, chat_history: List[dict]) -> List[dict] | str return messages def execute_last_code(self, os_name): + """ + Display the most recently saved or generated code and run it for the specified operating system. + + Retrieves the last saved output for the current interpreter mode/language, shows a message if no saved code is available, renders the code to the user, and then attempts execution through the interpreter's guarded execution path. Execution results and errors are displayed to the user and recorded in the logger. + Parameters: + os_name (str): Target operating system identifier used to locate the saved output and to select the platform-specific execution path. + """ try: code_file, code_snippet = self.utility_manager.get_output_history(mode=self.INTERPRETER_MODE, os_name=os_name, language=self.INTERPRETER_LANGUAGE) @@ -551,6 +792,23 @@ def execute_last_code(self, os_name): raise def _extract_latest_user_text(self, message, messages): + """ + Extract the most recent user-provided text from a direct message or a conversation messages list. + + Search order and behavior: + - If `message` is a non-empty string, return it trimmed. + - Otherwise, if `messages` is a list, scan it in reverse for the last item with `"role" == "user"`. + - If that user's `"content"` is a non-empty string, return it trimmed. + - If the user's `"content"` is a list of chunk dicts, extract chunks where `chunk["type"] == "text"`, join their `text` fields with spaces, and return the result if non-empty. + - If no user text is found, return the literal string "Help with this request." + + Parameters: + message: a direct message string or other value to consider first. + messages: an optional list of message dicts (each may include "role" and "content") to search when `message` is empty. + + Returns: + Extracted user text (trimmed) when available, otherwise the fallback string "Help with this request." + """ if isinstance(message, str) and message.strip(): return message.strip() if isinstance(messages, list): @@ -571,6 +829,24 @@ def _extract_latest_user_text(self, message, messages): return "Help with this request." def _run_openai_compatible_completion(self, api_key_name, messages, temperature, max_tokens, api_base, extra_headers=None): + """ + Send a completion request to an OpenAI-compatible LLM provider using credentials from the environment. + + Parameters: + api_key_name (str): Name of the environment variable that holds the API key. + messages (list|dict): Message payload formatted for the model provider. + temperature (float): Sampling temperature for the completion. + max_tokens (int): Maximum tokens to generate. + api_base (str): Base URL for the model API; must be set for custom models. + extra_headers (dict, optional): Additional HTTP headers to include with the request. + + Returns: + The response object returned by litellm.completion for the configured model. + + Raises: + Exception: If the specified API key environment variable is missing. + Exception: If `api_base` equals the string 'None' (i.e., not configured). + """ api_key = os.getenv(api_key_name) if not api_key: raise Exception(f"{api_key_name} not found in .env file.") @@ -590,6 +866,23 @@ def _run_openai_compatible_completion(self, api_key_name, messages, temperature, return litellm.completion(self.INTERPRETER_MODEL, **completion_kwargs) def _generate_browser_use_content(self, message, messages, config_values): + """ + Use the Browser Use API to run a browsing task and return the session result. + + Parameters: + message (str): The latest user message fallback used when extracting the task. + messages (list|None): Conversation message history used to extract the most recent user text. + config_values (Mapping): Configuration overrides; supports keys: + - "api_base": base URL for the Browser Use API (default "https://api.browser-use.com/api/v3") + - "browser_use_timeout": maximum seconds to wait for completion (default 150) + - "browser_use_poll_interval": seconds between poll attempts (default 3) + + Returns: + str: The session output as a string; if the output is a dict or list it is returned as a JSON string. + + Raises: + Exception: If BROWSER_USE_API_KEY is missing, session creation fails or returns no id, the session reports a terminal failure status, or the session times out. + """ api_key = os.getenv("BROWSER_USE_API_KEY") if not api_key: raise Exception("BROWSER_USE_API_KEY not found in .env file.") @@ -647,6 +940,22 @@ def _generate_browser_use_content(self, message, messages, config_values): raise Exception("Browser Use session timed out.") def generate_content(self, message, chat_history: list[tuple[str, str]], temperature=0.1, max_tokens=1024,config_values=None,image_file=None): + """ + Generate model output for the current interpreter mode and return the extracted text content. + + This builds provider-specific request payloads from the given message and chat history, calls the appropriate LLM or vision endpoint (handling provider/model-specific mappings and API base overrides), and returns the utility-extracted textual result. + + Parameters: + message (str): The user's prompt or task to send to the model. + chat_history (list[tuple[str, str]]): Conversation history as (role, text) pairs to include in the request. + temperature (float): Sampling temperature for the model (lower values make outputs more deterministic). + max_tokens (int): Maximum number of tokens to request from the model. + config_values (dict | None): Optional config overrides (e.g., 'temperature', 'max_tokens', 'api_base', 'provider') that influence provider selection and request parameters. + image_file (str | None): Path or URL to an image when using vision models; required for vision-mode Gemini calls. + + Returns: + generated_text (str): The text extracted from the model/vision response suitable for downstream processing or display. + """ self.logger.info(f"Generating content with args: message={message}, chat_history={chat_history}, temperature={temperature}, max_tokens={max_tokens}, config_values={config_values}, image_file={image_file}") self.logger.info(f"Interpreter model selected is '{self.INTERPRETER_MODEL}'") api_base = 'None' @@ -848,6 +1157,18 @@ def generate_content(self, message, chat_history: list[tuple[str, str]], tempera def get_code_prompt(self, task, os_name): + """ + Constructs a concise model instruction that requests a single, executable code block in the configured interpreter language for the given task and operating system. + + The prompt enforces returning exactly one fenced code block with no explanations, comments, or extra prose; requires production-ready syntax and imports; discourages side effects (file creation, network calls, subprocesses, plots, etc.) unless explicitly required by the task; and asks for safe handling of common filesystem/permission errors. + + Parameters: + task (str): Natural-language description of the task to implement. + os_name (str): Target operating system name used to tailor the prompt. + + Returns: + str: A prompt string instructing the model to generate one executable fenced code block in the interpreter language for the specified task and OS. + """ if self.INTERPRETER_LANGUAGE not in ['python', 'javascript']: self.INTERPRETER_LANGUAGE = 'python' @@ -865,6 +1186,19 @@ def get_code_prompt(self, task, os_name): return prompt def get_script_prompt(self, task, os_name): + """ + Builds a concise instruction requesting a single OS-compatible script implementing the given task. + + Parameters: + task (str): The user task or requirement to implement in the script. + os_name (str): Host operating system name or identifier used to select the script language (e.g., macOS/Darwin, Linux, Windows). + + Returns: + str: A prompt string that asks the model to return only the script (no extra text), constrained to an OS-appropriate script type. + + Side effects: + Sets self.INTERPRETER_LANGUAGE to the detected script language for the provided OS. + """ os_name_lower = os_name.lower() # Combined dictionary for both language mapping and script type @@ -892,6 +1226,18 @@ def get_script_prompt(self, task, os_name): return prompt def get_command_prompt(self, task, os_name): + """ + Builds a strict instruction prompt asking the model to produce a single terminal command. + + The returned prompt embeds the user task and target operating system, requires the command to be compatible with the specified OS, and instructs the model to output only the single, simplest safe built-in command without additional text, unrelated chaining, or file generation. + + Parameters: + task (str): The user task or request to be converted into a terminal command. + os_name (str): The target operating system name/version to ensure command compatibility. + + Returns: + prompt (str): A formatted prompt string directing the model to return exactly one terminal command. + """ prompt = ( f"Generate only the single terminal command for this task:\n" f"Task: '{task}'\n" @@ -912,6 +1258,16 @@ def handle_chat_mode(self, task): return prompt def get_mode_prompt(self, task, os_name): + """ + Selects and returns a prompt tailored to the interpreter's active mode for the given task and operating system. + + Parameters: + task (str): The user's task or request to be converted into a prompt. + os_name (str): Operating system identifier used to tailor prompts for scripts/commands (e.g., "linux", "darwin", "windows"). + + Returns: + A mode-appropriate prompt payload for the provided task and OS (for example, a messages list or a system prompt string depending on the active mode). + """ if self.CODE_MODE: self.logger.info("Getting code prompt.") return self.get_code_prompt(task, os_name) @@ -930,6 +1286,27 @@ def get_mode_prompt(self, task, os_name): def execute_code(self, extracted_code, os_name, sandbox_context=None, force_execute=False): # If the interpreter mode is Vision, do not execute the code. + """ + Execute a previously generated code snippet according to the current interpreter mode and execution policy. + + Parameters: + extracted_code (str): The code or command text to execute. + os_name (str): Host operating system identifier used for script/command execution selection. + sandbox_context (optional): Context object returned by the safety manager to constrain execution (passed to the code interpreter). + force_execute (bool): When True, bypasses interactive confirmation and proceeds with execution. + + Behavior: + - Does not execute for 'vision' or 'chat' modes; returns (None, None). + - If neither `force_execute` nor the interpreter's `EXECUTE_CODE` flag is set, prompts the user "Execute the code? (Y/N): " and records the decision in `self._last_execution_approved`. + - Executes using the appropriate code_interpreter method based on mode: + - SCRIPT_MODE -> execute_script(...) + - COMMAND_MODE -> execute_command(...) + - CODE_MODE -> execute_code(...) with `self.INTERPRETER_LANGUAGE` + - Passes `sandbox_context` through to the code interpreter when provided. + + Returns: + (tuple) A pair (stdout, stderr)-style values from the code interpreter on successful invocation, `(None, error_message)` if execution raised an exception, or `(None, None)` if execution was skipped. + """ if self.INTERPRETER_MODE in ['vision', 'chat']: return None, None @@ -959,6 +1336,17 @@ def execute_code(self, extracted_code, os_name, sandbox_context=None, force_exe def interpreter_main(self, version): + """ + Run the interpreter's interactive main loop, processing user tasks and model-driven code generation/execution. + + This starts a REPL that reads tasks from stdin or a prompt file, handles built-in commands (e.g., /exit, /help, /save, /mode, /model, /install, /execute, /fix), constructs mode-appropriate prompts for the LLM, requests and extracts executable blocks, optionally saves and executes generated code (with sandbox/safety checks and package-install/retry/repair flows), opens generated resource files (graphs/charts/tables), and persists session history. The loop also supports switching modes, models, and languages, and prints user-facing messages via the configured UI utilities. + + Parameters: + version (str): Version label for this interpreter session (used for logging and display). + + Raises: + Exception: Re-raises unrecoverable errors encountered during the loop. + """ self.interpreter_version = version self.logger.info(f"Interpreter - v{self.interpreter_version}") diff --git a/libs/logger.py b/libs/logger.py index b7358f3..c67e8a9 100644 --- a/libs/logger.py +++ b/libs/logger.py @@ -9,6 +9,14 @@ class SafeStreamHandler(logging.StreamHandler): """A console handler that degrades non-encodable characters safely on Windows.""" def emit(self, record): + """ + Emit a log record to the handler's stream, falling back to an ASCII-safe representation on encoding errors. + + Attempts to emit the provided logging record normally. If a UnicodeEncodeError occurs while writing to the stream, formats the record and writes an ASCII-safe version (with replacement characters) to the stream, then flushes. If the fallback write fails, delegates error handling to the handler's error handler. + + Parameters: + record (logging.LogRecord): The log record to be emitted. + """ try: super().emit(record) except UnicodeEncodeError: @@ -28,6 +36,15 @@ class Logger: @staticmethod def initialize(filename: str) -> logging.Logger: + """ + Initialize and return a singleton logger configured with a rotating file handler and a console handler. + + Parameters: + filename (str): Path (and logger name) for the rotating log file. If a logger for this name already exists, the existing configured logger is returned. + + Returns: + logging.Logger: The singleton logger configured with a rotating file handler (5MB max per file, 5 backups) and a console stream handler. + """ if Logger._logger is None: Logger._logger = logging.getLogger(filename) Logger._logger.setLevel(logging.DEBUG) diff --git a/libs/safety_manager.py b/libs/safety_manager.py index 71bfb71..6d4b8e8 100644 --- a/libs/safety_manager.py +++ b/libs/safety_manager.py @@ -27,6 +27,20 @@ class RepairCircuitBreaker: attempts: int = 0 def should_continue(self, error_text: str) -> bool: + """ + Decides whether another repair attempt should be made for the given error. + + Normalizes the provided error text to determine uniqueness. If the maximum number + of attempts has been reached or the normalized non-empty error has been seen + before, no further attempts are allowed. Otherwise the normalized error (when + non-empty) is recorded and the attempt count is incremented. + + Parameters: + error_text (str): Raw error message used to assess uniqueness for retries. + + Returns: + bool: `True` if a new attempt is permitted, `False` otherwise. + """ normalized = self._normalize_error(error_text) if self.attempts >= self.max_attempts: return False @@ -39,6 +53,15 @@ def should_continue(self, error_text: str) -> bool: @staticmethod def _normalize_error(error_text: str) -> str: + """ + Normalize an error message string for comparison by trimming, lowercasing, and collapsing internal whitespace. + + Parameters: + error_text (str): The input error text; may be None or empty. + + Returns: + normalized (str): The normalized error text with surrounding whitespace removed, all characters lowercased, and all runs of internal whitespace replaced by single spaces. If input is None or empty, returns an empty string. + """ error_text = (error_text or "").strip().lower() error_text = re.sub(r"\s+", " ", error_text) return error_text @@ -81,9 +104,22 @@ class ExecutionSafetyManager: ] def __init__(self): + """ + Initialize the ExecutionSafetyManager and configure its logger. + + Sets up an instance-level logger that writes to logs/interpreter.log and assigns it to `self.logger`. + """ self.logger = Logger.initialize("logs/interpreter.log") def build_sandbox_context(self) -> SandboxContext: + """ + Create a new sandboxed execution context with a temporary working directory and a restricted environment. + + The returned SandboxContext contains a newly created temporary directory as `cwd`, an `env` mapping that includes only allowed environment variables plus `PYTHONIOENCODING="utf-8"`, and a default `timeout_seconds` of 30. + + Returns: + SandboxContext: The sandbox context with `cwd` (temporary directory path), `env` (whitelisted environment variables and `PYTHONIOENCODING`), and `timeout_seconds` set to 30. + """ env = {} for key in self.SAFE_ENV_KEYS: if os.getenv(key): @@ -94,10 +130,32 @@ def build_sandbox_context(self) -> SandboxContext: return SandboxContext(cwd=cwd, env=env, timeout_seconds=30) def cleanup_sandbox_context(self, context: SandboxContext | None): + """ + Recursively remove the sandbox working directory if it exists. + + If `context` is provided and `context.cwd` points to an existing directory, that directory + and its contents are removed. Errors during removal are ignored. If `context` is `None`, + `context.cwd` is falsy, or the path does not exist, the function does nothing. + + Parameters: + context (SandboxContext | None): Sandbox context containing the `cwd` to delete. + """ if context and context.cwd and os.path.exists(context.cwd): shutil.rmtree(context.cwd, ignore_errors=True) def assess_execution(self, content: str, mode: str) -> SafetyDecision: + """ + Evaluate whether generated content is safe to execute in the sandbox. + + Checks that `content` is non-empty, rejects inputs that match the manager's configured dangerous patterns, and enforces mode-specific constraints (for `mode == "command"`, the content must be a single command line). + + Parameters: + content (str): The generated text to assess. + mode (str): Execution mode; `"command"` requires a single-line command, other values impose only pattern-based checks. + + Returns: + SafetyDecision: `allowed` is `True` when no safety violations were found; `reasons` lists detected issues (e.g., empty output, matched dangerous patterns, or multi-line command in command mode). + """ if not content or not content.strip(): return SafetyDecision(False, ["Generated output is empty."]) diff --git a/libs/terminal_ui.py b/libs/terminal_ui.py index 47b8b3b..27ec0a9 100644 --- a/libs/terminal_ui.py +++ b/libs/terminal_ui.py @@ -12,10 +12,30 @@ class TerminalUI: def __init__(self): + """ + Initialize the TerminalUI by creating a Rich Console for rendering and a UtilityManager for terminal utilities. + + Attributes: + console: Rich Console instance used for rendering UI elements. + utility_manager: UtilityManager instance used for screen management and model utilities. + """ self.console = Console() self.utility_manager = UtilityManager() def _read_key(self): + """ + Read a single keypress from stdin and normalize special keys to logical names. + + On Windows, reads a wide character via msvcrt and decodes extended key sequences into + arrow directions. On Unix-like systems, puts stdin into raw mode, reads one (or + few) byte(s) to decode ANSI escape sequences for arrows, and ensures terminal + settings are restored before returning. + + Returns: + str: One of the normalized tokens `'up'`, `'down'`, `'left'`, `'right'`, `'enter'`, + or `'escape'` for those special keys; otherwise the single-character string + that was pressed. + """ if os.name == 'nt': import msvcrt key = msvcrt.getwch() @@ -48,6 +68,18 @@ def _read_key(self): termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) def _render_selector(self, title, options, selected_index, help_text, default): + """ + Render a scrollable terminal selector UI and display the current selection. + + Renders an interactive-looking selector using the console: clears the screen, prints the title, a table of visible option rows with a marker for the current selection, a footer/help panel, and a "Selected" line showing the option at the current index. + + Parameters: + title (str): Heading displayed above the options. + options (Sequence[str]): List of option labels to display. + selected_index (int): Index of the option to mark as selected and center in the visible window when possible. + help_text (Optional[str]): Text shown in the footer panel; when falsy, a default usage hint is shown. + default (Optional[str]): Option label to annotate with " (default)" when present among the visible items. + """ self.utility_manager.clear_screen() visible_rows = max(8, min(14, self.console.size.height - 10)) start_index = max(0, selected_index - visible_rows // 2) @@ -74,6 +106,27 @@ def _render_selector(self, title, options, selected_index, help_text, default): self.console.print(f'Selected: [bold]{options[selected_index]}[/bold]') def _select_option(self, title, options, default, help_text=None): + """ + Present a selectable list of options to the user and return the chosen option. + + If stdin is not a TTY, prompts once using the provided title and returns + an exact or case-insensitive match from `options`, falling back to a valid + default. If stdin is a TTY, displays an interactive selector that accepts + Up/Down (or k/j), single-character prefix selection, Enter to confirm, and + Escape to cancel. + + Parameters: + title (str): Prompt title displayed to the user. + options (list[str]): Non-empty list of option strings to choose from. + default (str): Default option to pre-select or fall back to if no match. + help_text (str | None): Optional help text shown in the selector footer. + + Returns: + str: The selected option (one of the entries from `options`). + + Raises: + KeyboardInterrupt: If the user cancels the selection (Escape) in interactive mode. + """ if not sys.stdin.isatty(): default_choice = default if default in options else options[0] answer = Prompt.ask(f"{title}", default=default_choice).strip() @@ -109,14 +162,42 @@ def _select_option(self, title, options, default, help_text=None): break def _select_boolean(self, title, default=False): + """ + Prompt the user to choose between 'yes' and 'no' and return the selection as a boolean. + + Parameters: + title (str): The prompt title displayed to the user. + default (bool): The default selection when no explicit choice is made. + + Returns: + bool: `True` if 'yes' is selected, `False` otherwise. + """ default_choice = 'yes' if default else 'no' choice = self._select_option(title, ['yes', 'no'], default_choice, 'Use Up/Down arrows and Enter to choose.') return choice == 'yes' def select_mode(self, default_mode='code'): + """ + Prompt the user to choose an interpreter mode. + + Parameters: + default_mode (str): Mode to preselect in the chooser; if it is not a valid choice, the first available mode is used. + + Returns: + str: The selected mode, one of 'code', 'chat', 'script', 'command', or 'vision'. + """ return self._select_option('Mode', ['code', 'chat', 'script', 'command', 'vision'], default_mode) def select_model(self, default_model=None): + """ + Prompt the user to choose a model from the list of available models. + + Parameters: + default_model (str | None): Preferred model name to preselect. If None, the utility manager's default is used; if that default is not in the available list, the first available model is used. + + Returns: + str: The chosen model name from the available models. + """ models = self.utility_manager.list_available_models() default_model = default_model or self.utility_manager.get_default_model_name() if default_model not in models: @@ -124,12 +205,54 @@ def select_model(self, default_model=None): return self._select_option('Model', models, default_model, 'Use Up/Down arrows, Enter, or type the first letter to jump.') def select_language(self, default_lang='python'): + """ + Prompt the user to choose a programming language from available options. + + Parameters: + default_lang (str): Language to pre-select or fall back to if the user provides no valid selection. + + Returns: + str: The selected language, either 'python' or 'javascript'. + """ return self._select_option('Language', ['python', 'javascript'], default_lang) def select_boolean(self, title, default=False): + """ + Prompt the user to choose between "yes" and "no". + + Parameters: + title (str): The prompt title shown to the user. + default (bool): The default choice used when no explicit selection is made. + + Returns: + bool: `True` if the user selects "yes", `False` otherwise. + """ return self._select_boolean(title, default=default) def interactive_settings(self, interpreter): + """ + Prompt the user for interpreter settings using interactive selectors and return the chosen configuration. + + Parameters: + interpreter: An interpreter-like object used to read current defaults. Recognized attributes (if present) are: + - INTERPRETER_MODEL_LABEL or INTERPRETER_MODEL: default model name + - INTERPRETER_MODE: default mode (defaults to "code") + - INTERPRETER_LANGUAGE: default language (defaults to "python") + - DISPLAY_CODE: default for displaying generated code (bool) + - EXECUTE_CODE: default for auto-executing code (bool) + - SAVE_CODE: default for saving generated output (bool) + - INTERPRETER_HISTORY: default for enabling history (bool) + + Returns: + dict: A mapping with the selected settings: + - "mode" (str): selected interpreter mode + - "model" (str): selected model name + - "language" (str): selected language + - "display_code" (bool): whether to display generated code automatically + - "execute_code" (bool): whether to execute generated code automatically + - "save_code" (bool): whether to save generated output automatically + - "history" (bool): whether history memory is enabled + """ current_model = getattr(interpreter, "INTERPRETER_MODEL_LABEL", None) or getattr(interpreter, "INTERPRETER_MODEL", None) current_mode = getattr(interpreter, "INTERPRETER_MODE", "code") current_lang = getattr(interpreter, "INTERPRETER_LANGUAGE", "python") @@ -153,6 +276,23 @@ def interactive_settings(self, interpreter): } def launch(self, args): + """ + Present interactive prompts for interpreter settings (mode, model, language and related booleans) and return a resolved argparse.Namespace. + + Prompts the user for mode, model, and language (using provided CLI values as defaults). For appropriate modes, prompts for booleans controlling display of generated code, automatic execution, saving output, and history when those flags are not supplied on the CLI. Clears the terminal and renders a summary panel of the chosen configuration before requesting any additional boolean choices. + + Parameters: + args (argparse.Namespace): CLI arguments and flags. Expected attributes: + - mode, model, lang: optional initial choices for mode, model, and language. + - display_code, exec, save_code, history: optional boolean flags that, if falsy, may trigger interactive prompts. + - file: path or identifier to include in the returned namespace. + - unsafe (optional): passed through; defaults to False if missing. + - upgrade, cli, tui: passthrough flags included in the returned namespace. + + Returns: + argparse.Namespace: Namespace containing the resolved settings with keys: + exec, save_code, mode, model, display_code, lang, file, history, unsafe, upgrade, cli, tui. + """ mode = self.select_mode(args.mode or 'code') model = self.select_model(args.model or self.utility_manager.get_default_model_name()) language = self.select_language(args.lang or 'python') diff --git a/libs/utility_manager.py b/libs/utility_manager.py index 352acbb..139e912 100644 --- a/libs/utility_manager.py +++ b/libs/utility_manager.py @@ -28,6 +28,11 @@ class UtilityManager: ] def __init__(self): + """ + Initialize the UtilityManager by preparing logging infrastructure and creating the logger. + + Ensures a 'logs' directory exists and creates an empty 'logs/interpreter.log' file if missing, then initializes self.logger via Logger.initialize("logs/interpreter.log"). On any exception during setup, logs an error message and re-raises the exception. + """ try: if not os.path.exists('logs'): os.makedirs('logs') @@ -96,6 +101,14 @@ def get_os_platform(self): raise def initialize_readline_history(self): + """ + Initialize readline-based interactive input history and register it to be saved on process exit. + + Attempts to import `readline`, falling back to `pyreadline` on platforms where that is required. If a history file `~/.python_history` exists it is loaded, and `readline.write_history_file` is registered with `atexit` to persist history on exit. If readline support is unavailable or initialization fails, no exception is raised and the function returns `False`. + + Returns: + True if history support was successfully initialized and persistence was registered, False otherwise. + """ try: try: import readline @@ -130,6 +143,21 @@ def initialize_readline_history(self): return False def read_config_file(self, filename=".config"): + """ + Parse a simple key/value configuration file into a dictionary. + + The file is read line-by-line; lines starting with `#` and lines without an `=` are ignored. + Each remaining line is split at the first `=` into a key and value, and both are stripped of surrounding whitespace. + + Parameters: + filename (str): Path to the configuration file (default: ".config"). + + Returns: + dict: Mapping of configuration keys to their corresponding string values. + + Raises: + Exception: Propagates any exception raised while opening or reading the file. + """ try: config_data = {} with open(filename, "r") as config_file: @@ -145,6 +173,18 @@ def read_config_file(self, filename=".config"): raise def list_available_models(self, configs_path=None): + """ + List available model names from `.config` files in a directory. + + Parameters: + configs_path (str): Optional path to the directory containing `.config` files. If omitted, defaults to the `configs` subdirectory of the current working directory. + + Returns: + models (list[str]): Sorted list of model names derived from the `.config` filenames (filename stem without extension). + + Raises: + Exception: Re-raises any exception encountered while accessing or listing the directory after logging the error. + """ try: configs_path = configs_path or os.path.join(os.getcwd(), 'configs') configs_files = [file for file in os.listdir(configs_path) if file.endswith('.config')] @@ -155,6 +195,14 @@ def list_available_models(self, configs_path=None): @staticmethod def get_default_model_name(): + """ + Selects the default model name based on environment-configured API keys. + + Loads environment variables from a `.env` file in the current working directory without overriding existing environment variables, then returns the first model name from UtilityManager.DEFAULT_MODELS whose corresponding environment variable is set. If none are set, returns "gpt-4o". + + Returns: + str: The chosen default model name. + """ env_path = os.path.join(os.getcwd(), ".env") load_dotenv(dotenv_path=env_path, override=False) @@ -164,6 +212,20 @@ def get_default_model_name(): return "gpt-4o" def extract_file_name(self, prompt): + """ + Extracts the first plausible file path, filename, or URL with a recognized (non-binary) extension from the given prompt. + + Searches the prompt for a filesystem path, bare filename, or HTTP/HTTPS URL that ends with a file extension and returns the matched substring only if its extension is one of: .json, .csv, .xml, .xls, .txt, .md, .html, .png, .jpg, .jpeg, .gif, .svg, .zip, .tar, .gz, .7z, .rar. + + Parameters: + prompt (str): Text to scan for a filename, path, or URL. + + Returns: + str or None: The matched filename/path/URL when an allowed extension is found, `None` if no match or the extension is not allowed. + + Raises: + Exception: Re-raises any unexpected exception encountered while extracting the filename. + """ try: # This pattern looks for typical file paths, names, and URLs, then stops at the end of the extension pattern = r"((?:[a-zA-Z]:\\(?:[\w\-\.]+\\)*|/(?:[\w\-\.]+/)*|\b[\w\-\.]+\b|https?://[\w\-\.]+/[\w\-\.]+/)*[\w\-\.]+\.\w+)" @@ -258,6 +320,11 @@ def get_output_history(self, mode='code', os_name=None, language=None): raise def display_help(self): + """ + Display interpreter usage, startup flags, and available commands. + + Shows the supported startup flags (for example, `--cli` and `--tui`) and a concise list of all interpreter commands such as `/exit`, `/execute`, `/install`, `/save`, `/edit`, `/fix`, `/mode`, `/model`, `/language`, `/history`, `/clear`, `/help`, `/list`, `/version`, `/debug`, `/prompt`, `/upgrade`, and `/shell`. + """ display_markdown_message("Interpreter\n\ \n\ Startup flags:\n\ @@ -337,6 +404,11 @@ def _download_file(url, file_name): @staticmethod def upgrade_interpreter(): + """ + Upgrade the local code interpreter and optionally its Python dependencies. + + Downloads a requirements.txt from the project repository, runs pip to upgrade the open-code-interpreter package, and—if the requirements file was downloaded successfully—installs or upgrades packages from the downloaded requirements.txt. Progress is reported via markdown messages and relevant events and command output are logged; if a command produces output, the output is displayed and its first 100 characters are logged. + """ code_interpreter = CodeInterpreter() logger = Logger.initialize("logs/interpreter.log") # Download the requirements file diff --git a/scripts/validate_models_cli.py b/scripts/validate_models_cli.py index 32526f8..ecf4b93 100644 --- a/scripts/validate_models_cli.py +++ b/scripts/validate_models_cli.py @@ -69,6 +69,18 @@ class ModelConfig: def parse_hf_model(config_path: Path) -> str: + """ + Extracts the HF_MODEL value from a model config file. + + Parameters: + config_path (Path): Path to a `.config` file to read (UTF-8 with BOM tolerated). + + Returns: + str: The HF_MODEL value with surrounding whitespace and quotes removed. + + Raises: + ValueError: If no `HF_MODEL` assignment is found in the file. + """ for line in config_path.read_text(encoding="utf-8-sig").splitlines(): stripped = line.strip() if stripped.startswith("HF_MODEL") and "=" in stripped: @@ -77,6 +89,15 @@ def parse_hf_model(config_path: Path) -> str: def parse_provider(config_path: Path) -> str | None: + """ + Extracts the provider name from a config file, if specified. + + Parameters: + config_path (Path): Path to a UTF-8 (BOM-tolerant) .config file to scan for a `provider = ...` assignment. + + Returns: + str | None: The provider value lowercased with surrounding quotes and whitespace removed, or `None` if no provider assignment is found. + """ for line in config_path.read_text(encoding="utf-8-sig").splitlines(): stripped = line.strip() if stripped.startswith("provider") and "=" in stripped: @@ -85,6 +106,16 @@ def parse_provider(config_path: Path) -> str | None: def infer_provider(hf_model: str, explicit_provider: str | None = None) -> str: + """ + Infer the canonical provider identifier for a model based on its Hugging Face model string, with an optional explicit override. + + Parameters: + hf_model (str): The model identifier from the config (used to infer provider by inspecting common prefixes and substrings). + explicit_provider (str | None): If provided, this value is returned unchanged and no inference is performed. + + Returns: + str: The provider identifier (e.g., "openai", "nvidia", "z-ai", "browser-use", "gemini", "anthropic", "groq", "deepseek", "local", or "huggingface"). + """ if explicit_provider: return explicit_provider if hf_model.startswith(("gpt", "o1", "o3", "o4", "gpt-5")): @@ -109,11 +140,25 @@ def infer_provider(hf_model: str, explicit_provider: str | None = None) -> str: def infer_tier(alias: str, hf_model: str) -> str: + """ + Determine whether a model configuration is in the "preview" tier or "stable" tier. + + Returns: + 'preview' if either the alias or HF model string contains the substring "preview" (case-insensitive), 'stable' otherwise. + """ text = f"{alias} {hf_model}".lower() return "preview" if "preview" in text else "stable" def list_model_configs() -> list[ModelConfig]: + """ + Discover and parse model configuration files into ModelConfig objects. + + Scans CONFIGS_DIR for files matching `*.config` (sorted by filename), extracts each file's alias (stem), `HF_MODEL` value, optional `provider` setting, infers a provider when missing, determines the tier, and returns a list of populated ModelConfig instances. + + Returns: + list[ModelConfig]: List of model configurations with `alias`, `hf_model`, `provider`, and `tier`. + """ models: list[ModelConfig] = [] for config_path in sorted(CONFIGS_DIR.glob("*.config")): alias = config_path.stem @@ -126,10 +171,25 @@ def list_model_configs() -> list[ModelConfig]: def parse_csv_set(value: str) -> set[str]: + """ + Parse a comma-separated string into a set of normalized tokens. + + Parameters: + value (str): Comma-separated items to parse. + + Returns: + set[str]: Lowercased, trimmed, non-empty items from `value`. + """ return {item.strip().lower() for item in value.split(",") if item.strip()} def can_run_provider(provider: str) -> tuple[bool, str]: + """ + Determine whether the given provider is eligible to run in the cloud smoke test matrix. + + Returns: + tuple: A pair where the first element is `True` if the provider is allowed to run (required API key present) and `False` otherwise. The second element is a human-readable status message explaining the result (e.g., `"READY"` or `"SKIPPED (...)"`). + """ if provider == "local": return False, "SKIPPED (local provider not part of cloud smoke matrix)" env_key = PROVIDER_API_KEYS.get(provider) @@ -141,6 +201,17 @@ def can_run_provider(provider: str) -> tuple[bool, str]: def build_stdin(mode: str) -> str: + """ + Construct stdin input tailored to the given interpreter mode. + + For "chat" this returns a single-sentence chat prompt. For "vision" this returns a path to a test image. For any other mode it returns a tiny code/example prompt. All returned strings end with a line containing "/exit" to terminate the interactive session. + + Parameters: + mode (str): Interpreter mode; expected values include "chat", "vision", or other modes (e.g., "code", "script", "command"). + + Returns: + str: The complete stdin payload (including trailing newlines and the "/exit" line) to feed to the interpreter. + """ if mode == "chat": return "Say hello in one sentence.\n/exit\n" if mode == "vision": @@ -155,6 +226,19 @@ def run_cli_smoke( interpreter_path: Path, timeout: int, ) -> tuple[str, str]: + """ + Run the interpreter CLI for the given model alias using the specified Python binary and classify the captured output as PASS, FAIL, or SKIP. + + Parameters: + alias (str): Module alias passed to the interpreter with `-m`. + mode (str): Mode flag passed to the interpreter (`-md`). + python_bin (str): Path to the Python executable to invoke. + interpreter_path (Path): Path to the interpreter script to run. + timeout (int): Maximum seconds to wait for the interpreter process. + + Returns: + tuple[str, str]: A pair where the first element is one of `"PASS"`, `"FAIL"`, or `"SKIP"`, and the second is a short human-readable message describing the classification (e.g., reason for skip, failure detail, or success note). + """ cmd = [python_bin, str(interpreter_path), "-m", alias, "-md", mode, "-dc"] stdin_data = build_stdin(mode) try: @@ -189,6 +273,17 @@ def filter_models( providers: set[str], tier: str, ) -> list[ModelConfig]: + """ + Filter a sequence of ModelConfig objects by provider membership and optional tier. + + Parameters: + models (Iterable[ModelConfig]): Iterable of model configurations to filter. + providers (set[str]): Set of provider names; only models whose `provider` is in this set are kept. + tier (str): If "stable" or "preview", only models with a matching `tier` are kept; any other value (e.g., "all") disables tier filtering. + + Returns: + list[ModelConfig]: List of models that match the provider set and optional tier filter. + """ filtered = [m for m in models if m.provider in providers] if tier in {"stable", "preview"}: filtered = [m for m in filtered if m.tier == tier] @@ -196,6 +291,14 @@ def filter_models( def main() -> int: + """ + Run CLI smoke validation for selected model configurations and return an exit code. + + Parses command-line options to select providers, tier, mode, timeouts, Python executable, and interpreter entrypoint; discovers and filters model configs; for each eligible model invokes the interpreter CLI, classifies the result as PASS/FAIL/SKIP, prints per-model lines and a final summary, and accumulates pass/fail/skip counts. + + Returns: + int: `0` if no models failed; `1` if one or more models failed or if no models matched the filters. + """ load_dotenv(dotenv_path=ROOT_DIR / ".env", override=False) parser = argparse.ArgumentParser(description="Validate model configs via interpreter CLI smoke checks") diff --git a/tests/test_interpreter.py b/tests/test_interpreter.py index 2dfe066..873ec7b 100644 --- a/tests/test_interpreter.py +++ b/tests/test_interpreter.py @@ -19,6 +19,18 @@ def _read_hf_model(config_path: Path) -> str: + """ + Read a config file and return the value assigned to HF_MODEL. + + Parameters: + config_path (Path): Path to the configuration file to parse. + + Returns: + str: The HF_MODEL value with surrounding quotes and whitespace removed. + + Raises: + AssertionError: If no HF_MODEL entry is found in the file. + """ for line in config_path.read_text(encoding="utf-8-sig").splitlines(): stripped = line.strip() if stripped.startswith("HF_MODEL") and "=" in stripped: @@ -27,6 +39,15 @@ def _read_hf_model(config_path: Path) -> str: def _expected_completion_model(model_name: str) -> str: + """ + Map a raw model identifier to the canonical model identifier used for completion requests. + + Parameters: + model_name (str): The model identifier read from configuration (e.g., "claude-2.1", "groq-llama-3.3", "gpt-4o", or plain model names). + + Returns: + str: The canonical model identifier to pass to provider clients. Known legacy or provider-specific aliases are remapped to their modern equivalents (for example, Claude and Groq legacy names are translated, Deepseek entries are ensured to be prefixed with "deepseek/", and most non-prefixed models are returned with a "huggingface/" prefix). If no remapping applies, returns the original `model_name`. + """ if model_name.startswith(("gpt", "o1", "o3", "o4")): return model_name @@ -96,6 +117,25 @@ def _expected_completion_model(model_name: str) -> str: class TestInterpreter(unittest.TestCase): def _make_args(self, mode="code", model="code-llama"): + """ + Create a Namespace of CLI-like interpreter arguments used by tests. + + Parameters: + mode (str): Interpreter mode to set (e.g., "code", "vision"). + model (str): Model name to set as the interpreter model. + + Returns: + argparse.Namespace: A Namespace populated with default CLI options: + exec (bool): False + save_code (bool): False + mode (str): Provided mode + model (str): Provided model + display_code (bool): False + lang (str): "python" + file (None|str): None + history (bool): False + upgrade (bool): False + """ return Namespace( exec=False, save_code=False, @@ -119,6 +159,11 @@ def test_mode_is_initialized_from_args(self, _mock_history, _mock_client): @patch("libs.interpreter_lib.Interpreter.initialize_client", return_value=None) @patch("libs.utility_manager.UtilityManager.initialize_readline_history", return_value=None) def test_openai_o_series_uses_openai_path(self, _mock_history, _mock_client): + """ + Verifies that OpenAI O-series models are routed through the OpenAI-compatible completion path. + + Asserts that generate_content invokes the litellm completion function once with the O-series model identifier ("o1-mini") and returns the extracted content ("ok"). + """ interpreter = Interpreter(self._make_args(model="o1-mini")) interpreter.INTERPRETER_MODEL = "o1-mini" @@ -139,6 +184,11 @@ def test_openai_o_series_uses_openai_path(self, _mock_history, _mock_client): @patch("libs.interpreter_lib.Interpreter.initialize_client", return_value=None) @patch("libs.utility_manager.UtilityManager.initialize_readline_history", return_value=None) def test_legacy_claude_alias_is_remapped_to_sonnet_46(self, _mock_history, _mock_client): + """ + Verifies that the legacy Claude model alias "claude-2.1" is remapped to "claude-sonnet-4-6" when invoking completions. + + When Interpreter.INTERPRETER_MODEL is set to "claude-2.1", calling generate_content should invoke the completion API with the mapped model identifier "claude-sonnet-4-6". + """ interpreter = Interpreter(self._make_args(model="claude-2.1")) interpreter.INTERPRETER_MODEL = "claude-2.1" @@ -179,6 +229,11 @@ def test_initialize_client_uses_shared_default_model_when_missing( self.assertEqual(interpreter.INTERPRETER_MODEL_LABEL, "groq-gpt-oss-20b") def test_every_config_is_parseable_and_has_hf_model(self): + """ + Verify all configuration files under CONFIGS_DIR are parseable and define a non-empty HF_MODEL. + + Loads each `*.config` file and asserts the parsed values include an `HF_MODEL` key whose value is not empty after trimming whitespace. + """ utility_manager = UtilityManager() config_files = sorted(CONFIGS_DIR.glob("*.config")) self.assertTrue(config_files, "No config files found") @@ -265,6 +320,12 @@ def test_extract_code_prefers_triple_backticks_when_config_uses_single_backtick( self.assertEqual(extracted, "print('OK')") def test_legacy_alias_configs_are_mapped_to_modern_targets(self): + """ + Verifies that legacy config filenames are remapped to their expected modern `HF_MODEL` values. + + For each legacy config in a predefined mapping, reads the config's `HF_MODEL` via `_read_hf_model` + and asserts it equals the expected modern target. Each mapping is executed as a unittest subTest. + """ expected_aliases = { "gpt-3.5-turbo.config": "gpt-4o-mini", "gpt-4.config": "gpt-4.1", @@ -285,6 +346,11 @@ def test_legacy_alias_configs_are_mapped_to_modern_targets(self): self.assertEqual(hf_model, expected_hf_model) def test_new_provider_configs_exist(self): + """ + Verify that a set of required provider config files exist and specify the expected HF_MODEL values. + + For each config filename in the required set, read its HF_MODEL entry and assert it matches the expected canonical model identifier. + """ required_configs = { "openrouter-free.config": "openrouter/free", "nvidia-nemotron.config": "nvidia/nemotron-3-super-120b-a12b",