Skip to content

[Misc] Batch API Service, working with tempory existing local batch driver #1298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

zhangjyr
Copy link
Collaborator

@zhangjyr zhangjyr commented Jul 17, 2025

Pull Request Description

This PR implemented extended API server with Batch support implemented in Python. In this version, API server uses existing batch driver to simulate batching process locally.

I added an e2e test to make sure API server works. The files service added in this PR is for e2e test only.

Related Issues

Resolves: part of #1277

Important: Before submitting, please complete the description above and review the checklist below.


Contribution Guidelines (Expand for Details)

We appreciate your contribution to aibrix! To ensure a smooth review process and maintain high code quality, please adhere to the following guidelines:

Pull Request Title Format

Your PR title should start with one of these prefixes to indicate the nature of the change:

  • [Bug]: Corrections to existing functionality
  • [CI]: Changes to build process or CI pipeline
  • [Docs]: Updates or additions to documentation
  • [API]: Modifications to aibrix's API or interface
  • [CLI]: Changes or additions to the Command Line Interface
  • [Misc]: For changes not covered above (use sparingly)

Note: For changes spanning multiple categories, use multiple prefixes in order of importance.

Submission Checklist

  • PR title includes appropriate prefix(es)
  • Changes are clearly explained in the PR description
  • New and existing tests pass successfully
  • Code adheres to project style and best practices
  • Documentation updated to reflect changes (if applicable)
  • Thorough testing completed, no regressions introduced

By submitting this PR, you confirm that you've read these guidelines and your changes align with the project's contribution standards.

Jingyuan Zhang added 5 commits July 16, 2025 12:02
Support async create_job
Lint Fix

Signed-off-by: Jingyuan Zhang <[email protected]>
Signed-off-by: Jingyuan Zhang <[email protected]>
Signed-off-by: Jingyuan Zhang <[email protected]>
@zhangjyr zhangjyr requested review from Jeffwan and varungup90 July 17, 2025 06:41
@zhangjyr zhangjyr added kind/enhancement New feature or request kind/feature Categorizes issue or PR as related to a new feature. labels Jul 17, 2025
Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @zhangjyr, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a robust and extensible Batch API service. It significantly refactors the underlying batch processing system to support both local execution and Kubernetes-native job management, all while providing an OpenAI-compatible interface. The changes lay a strong foundation for future scalability and integration with diverse backends, complemented by improved observability through structured logging and thorough testing.

Highlights

  • Batch API Service: I've implemented an extended API server with OpenAI-compatible Batch API support, allowing users to create, retrieve, cancel, and list batch jobs.
  • Structured Job Entity Model: A new set of Pydantic models (e.g., BatchJob, BatchJobSpec, BatchJobStatus) has been introduced to represent batch jobs in a structured, Kubernetes-inspired format, enhancing data consistency and clarity.
  • Pluggable Job Management Architecture: The core batch processing logic has been refactored to support different job execution backends through an abstract JobEntityManager interface. This design enables seamless integration with various systems like Kubernetes, or cloud storage solutions like S3, in the future.
  • Kubernetes Integration (Kopf): A new JobCache component has been added, leveraging the kopf framework to manage batch jobs as native Kubernetes Job resources. This allows batch processing to scale and be managed directly within a Kubernetes cluster.
  • Enhanced Local Batch Driver: The existing local batch driver has been updated to align with the new structured job model and includes proper asynchronous shutdown mechanisms, ensuring reliable operation for local simulations.
  • Structured Logging: Logging across the new and modified components now uses structlog to produce consistent, machine-readable JSON logs, which will greatly improve observability and debugging.
  • Comprehensive End-to-End Testing: Extensive end-to-end tests have been added for the new OpenAI Batch API, covering file uploads, the entire batch job lifecycle (creation, status polling, output download), and various error handling scenarios.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-structured Batch API service. The use of a JobEntityManager abstraction for different backends is a great design choice, and the new FastAPI endpoints with e2e tests are excellent additions. I've identified a few critical issues related to asynchronous programming and error handling that should be addressed. I also have some suggestions to improve maintainability and robustness. Overall, this is a very strong contribution.

Comment on lines +63 to 65
if len(requests) == 0:
logger.warning("Can not read inputs from storage", file_id=input_id)
return requests[0]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There's a potential IndexError here. If _storage.get_job_input_requests returns an empty list, the logger.warning is called, but the code proceeds to return requests[0], which will crash.

You should handle the case where requests is empty, for example by returning None or raising an exception, to prevent the unhandled exception.

Suggested change
if len(requests) == 0:
logger.warning("Can not read inputs from storage", file_id=input_id)
return requests[0]
if not requests:
logger.warning("Can not read inputs from storage", file_id=input_id)
return None # Or raise an exception
return requests[0]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be refactor in next PR

Comment on lines 539 to 547
for req_id in executed_requests:
if req_id < 0 or req_id >= request_len:
print(f"makr job {job_id} progress, request index out of boundary!")
logger.error( # type: ignore[call-arg]
"Mark job progress failed - request index out of boundary",
job_id=job_id,
)
invalid_flag = True
continue
meta_data.complete_one_request(req_id)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In mark_job_progress, if an invalid req_id is encountered, invalid_flag is set to True, but the loop continues to process other request IDs. This could lead to a partial update of the job's progress. Furthermore, the boolean return value of this method is not checked by its caller in request_proxy.py, so this error condition would fail silently.

Consider raising an exception or stopping the loop immediately when an invalid req_id is found to ensure errors are handled explicitly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bug will be handled when we refactor local scheduling.


job = self._in_progress_jobs[job_id]
assert isinstance(job, JobMetaInfo)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using assert isinstance(job, JobMetaInfo) creates a tight coupling between the JobManager and the JobMetaInfo class, which is marked as legacy. This makes the system more fragile and harder to refactor in the future.

To improve maintainability, consider decoupling the progress-tracking logic from the BatchJob data model. The logic within JobMetaInfo could be moved into the JobManager or a dedicated progress-tracking class that operates on BatchJob objects without requiring inheritance.

Jingyuan Zhang and others added 2 commits July 17, 2025 13:59
@Jeffwan
Copy link
Collaborator

Jeffwan commented Jul 20, 2025

Let's hold this PR. We need to discuss the v0.4.0 cut off date. It's better to merge it after the cut off. Hope it doesn't block the development process

Signed-off-by: Jingyuan Zhang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/enhancement New feature or request kind/feature Categorizes issue or PR as related to a new feature.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants