Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion diracx-api/src/diracx/api/jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

__all__ = ("create_sandbox", "download_sandbox")
__all__ = ("create_sandbox", "download_sandbox", "submit_jobs")

import hashlib
import logging
Expand All @@ -13,6 +13,7 @@

import httpx
import zstandard
from DIRACCommon.Core.Utilities.ClassAd.ClassAdLight import ClassAd

from diracx.client.aio import AsyncDiracClient
from diracx.client.models import SandboxInfo
Expand Down Expand Up @@ -123,3 +124,26 @@ async def download_sandbox(pfn: str, destination: Path, *, client: AsyncDiracCli
with tarfile_open(fh) as tf:
tf.extractall(path=destination, filter="data")
logger.debug("Extracted %s to %s", pfn, destination)


@with_client
async def submit_jobs(jdls: list[str], *, client: AsyncDiracClient):
# Create and upload InputSandboxes from JDLs
for i, jdl in enumerate(jdls):
# Fix possible lack of brackets
if jdl.strip()[0] != "[":
jdl = f"[{jdl}]"

class_ad_job = ClassAd(jdl)
if class_ad_job.lookupAttribute("InputSandbox"):
isb = class_ad_job.getListFromExpression("InputSandbox")
sandboxes_pfn = await create_sandbox(
paths=[Path(file_path) for file_path in isb]
)
logging.info(f"InputSandbox created: {sandboxes_pfn[13:]}")
class_ad_job.set_expression("InputSandbox", {sandboxes_pfn})

jdls[i] = class_ad_job.asJDL()

jobs = await client.jobs.submit_jdl_jobs(list(jdls))
return jobs
8 changes: 5 additions & 3 deletions diracx-cli/src/diracx/cli/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from rich.table import Table
from typer import FileText, Option

from diracx.api.jobs import submit_jobs
from diracx.client.aio import AsyncDiracClient
from diracx.core.models import ScalarSearchOperator, SearchSpec, VectorSearchOperator
from diracx.core.preferences import OutputFormats, get_diracx_preferences
Expand Down Expand Up @@ -150,9 +151,10 @@ def display_rich(data, content_range: ContentRange) -> None:


@app.async_command()
async def submit(jdl: list[FileText]):
async with AsyncDiracClient() as api:
jobs = await api.jobs.submit_jdl_jobs([x.read() for x in jdl])
async def submit(jdls: list[FileText]):
jdls_values = [jdl.read() for jdl in jdls]

jobs = await submit_jobs(jdls_values)
print(
f"Inserted {len(jobs)} jobs with ids: {','.join(map(str, (job.job_id for job in jobs)))}"
)
5 changes: 3 additions & 2 deletions diracx-logic/src/diracx/logic/jobs/sandboxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,15 @@ async def assign_sandbox_to_job(
job_id: int,
pfn: str,
sandbox_metadata_db: SandboxMetadataDB,
sandbox_type: Literal["input", "output"],
settings: SandboxStoreSettings,
):
"""Map the pfn as output sandbox to job."""
"""Map the pfn as input or output sandbox to job."""
short_pfn = pfn.split("|", 1)[-1]
await sandbox_metadata_db.assign_sandbox_to_jobs(
jobs_ids=[job_id],
pfn=short_pfn,
sb_type=SandboxType.Output,
sb_type=SandboxType(sandbox_type.capitalize()),
se_name=settings.se_name,
)

Expand Down
4 changes: 0 additions & 4 deletions diracx-logic/src/diracx/logic/jobs/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,6 @@ async def create_jdl_jobs(jobs: list[JobSubmissionSpec], job_db: JobDB, config:
)
)

# Fix possible lack of brackets
if original_jdl.strip()[0] != "[":
original_jdl = f"[{original_jdl}]"

original_jdls.append(
(
original_jdl,
Expand Down
9 changes: 6 additions & 3 deletions diracx-routers/src/diracx/routers/jobs/sandboxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,22 @@ async def get_job_sandbox(
return await get_job_sandbox_bl(job_id, sandbox_metadata_db, sandbox_type)


@router.patch("/{job_id}/sandbox/output")
@router.patch("/{job_id}/sandbox/{sandbox_type}")
async def assign_sandbox_to_job(
job_id: int,
pfn: Annotated[str, Body(max_length=256, pattern=SANDBOX_PFN_REGEX)],
sandbox_type: Literal["input", "output"],
sandbox_metadata_db: SandboxMetadataDB,
job_db: JobDB,
settings: SandboxStoreSettings,
check_permissions: CheckWMSPolicyCallable,
):
"""Map the pfn as output sandbox to job."""
"""Map the pfn as input or output sandbox to job."""
await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=[job_id])
try:
await assign_sandbox_to_job_bl(job_id, pfn, sandbox_metadata_db, settings)
await assign_sandbox_to_job_bl(
job_id, pfn, sandbox_metadata_db, sandbox_type, settings
)
except SandboxNotFoundError as e:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail="Sandbox not found"
Expand Down