Skip to content

Add tracking of submitted transforms #634

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/command_line.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ list
^^^^

Show all of the cached transforms along with the run time, code
generator, and number of resulting files
generator, and number of resulting files. Pending submissions are
also listed with the run date and file count shown as ``Pending``.

delete
^^^^^^
Expand Down
10 changes: 10 additions & 0 deletions servicex/app/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def list():
table.add_column("Files")
table.add_column("Format")
runs = cache.cached_queries()
pending = cache.submitted_queries()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please change "pending" to "submitted"

for r in runs:
table.add_row(
r.title,
Expand All @@ -71,6 +72,15 @@ def list():
str(r.files),
r.result_format,
)
for r in pending:
table.add_row(
r.get("title", ""),
r.get("codegen", ""),
r.get("request_id", ""),
"Pending",
"Pending",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In particular as to what is shown to the users

str(r.get("result_format", "")),
)
rich.print(table)


Expand Down
30 changes: 30 additions & 0 deletions servicex/query_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import os
from pathlib import Path
from typing import List, Optional
from datetime import datetime, timezone
from filelock import FileLock
from tinydb import TinyDB, Query, where

Expand Down Expand Up @@ -148,6 +149,24 @@ def update_transform_request_id(self, hash_value: str, request_id: str) -> None:
transform.hash == hash_value,
)

def cache_submitted_transform(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer not to have a submitted-only code path

self, transform: TransformRequest, request_id: str
) -> None:
"""Cache a transform that has been submitted but not completed."""

record = {
"hash": transform.compute_hash(),
"title": transform.title,
"codegen": transform.codegen,
"result_format": transform.result_format,
"request_id": request_id,
"status": "SUBMITTED",
"submit_time": datetime.now(timezone.utc).isoformat(),
Copy link
Preview

Copilot AI Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider accepting submit_time as a parameter to make the method more testable and flexible, rather than always using the current time.

Suggested change
"submit_time": datetime.now(timezone.utc).isoformat(),
"submit_time": submit_time,

Copilot uses AI. Check for mistakes.

}
transforms = Query()
with self.lock:
self.db.upsert(record, transforms.hash == record["hash"])

def get_transform_by_hash(self, hash: str) -> Optional[TransformedResults]:
"""
Returns completed transformations by hash
Expand Down Expand Up @@ -203,6 +222,17 @@ def cached_queries(self) -> List[TransformedResults]:
]
return result

def submitted_queries(self) -> List[dict]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be generalized to return queries in a specified state (not just submitted)?

"""Return all transform records that are only submitted."""
transforms = Query()
with self.lock:
return [
doc
for doc in self.db.search(
(transforms.status == "SUBMITTED") & transforms.request_id.exists()
)
]

def delete_record_by_request_id(self, request_id: str):
with self.lock:
self.db.remove(where("request_id") == request_id)
Expand Down
3 changes: 1 addition & 2 deletions servicex/query_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,7 @@ def transform_complete(task: Task):
self.request_id = self.cache.get_transform_request_id(sx_request_hash)
else:
self.request_id = await self.servicex.submit_transform(sx_request)
self.cache.update_transform_request_id(sx_request_hash, self.request_id)
self.cache.update_transform_status(sx_request_hash, "SUBMITTED")
self.cache.cache_submitted_transform(sx_request, self.request_id)

monitor_task = loop.create_task(
self.transform_status_listener(
Expand Down
6 changes: 3 additions & 3 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ async def test_submit_and_download_cache_miss(python_dataset, completed_status):
python_dataset.servicex.submit_transform = AsyncMock()
python_dataset.download_files = AsyncMock()
python_dataset.download_files.return_value = []
python_dataset.cache.update_transform_request_id = Mock()
python_dataset.cache.cache_submitted_transform = Mock()

signed_urls_only = False
expandable_progress = ExpandableProgress()
Expand Down Expand Up @@ -354,7 +354,7 @@ async def test_submit_and_download_cache_miss_overall_progress(
python_dataset.servicex.submit_transform = AsyncMock()
python_dataset.download_files = AsyncMock()
python_dataset.download_files.return_value = []
python_dataset.cache.update_transform_request_id = Mock()
python_dataset.cache.cache_submitted_transform = Mock()

signed_urls_only = False
expandable_progress = ExpandableProgress(overall_progress=True)
Expand Down Expand Up @@ -423,7 +423,7 @@ async def test_submit_and_download_cache_miss_signed_urls_only(
python_dataset.servicex.submit_transform = AsyncMock()
python_dataset.download_files = AsyncMock()
python_dataset.download_files.return_value = []
python_dataset.cache.update_transform_request_id = Mock()
python_dataset.cache.cache_submitted_transform = Mock()

signed_urls_only = True
expandable_progress = ExpandableProgress()
Expand Down
37 changes: 24 additions & 13 deletions tests/test_query_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,8 @@ def test_get_transform_request_id(transform_request, completed_status):
request_id = cache.get_transform_request_id(hash_value)
print(request_id)

# update the transform request with a request id and then check for the request id
cache.update_transform_status(hash_value, "SUBMITTED")
cache.update_transform_request_id(hash_value, "123456")
# cache the submitted transform and then check for the request id
cache.cache_submitted_transform(transform_request, "123456")
request_id = cache.get_transform_request_id(hash_value)
assert request_id == "123456"

Expand Down Expand Up @@ -276,18 +275,30 @@ def test_get_transform_request_status(transform_request, completed_status):

assert cache.is_transform_request_submitted(hash_value) is False

# cache transform
cache.update_transform_status(hash_value, "SUBMITTED")
cache.cache_transform(
cache.transformed_results(
transform=transform_request,
completed_status=completed_status,
data_dir="/foo/bar",
file_list=file_uris,
signed_urls=[],
)
# cache submitted transform
cache.cache_submitted_transform(
transform_request, "b8c508d0-ccf2-4deb-a1f7-65c839eebabf"
)

assert cache.is_transform_request_submitted(hash_value) is True

cache.close()


def test_cache_submitted_queries(transform_request):
with tempfile.TemporaryDirectory() as temp_dir:
config = Configuration(cache_path=temp_dir, api_endpoints=[]) # type: ignore
cache = QueryCache(config)

cache.cache_submitted_transform(transform_request, "123456")

pending = cache.submitted_queries()
assert len(pending) == 1
assert pending[0]["status"] == "SUBMITTED"
assert pending[0]["request_id"] == "123456"
assert (
cache.is_transform_request_submitted(transform_request.compute_hash())
is True
)

cache.close()
Loading