From 8e7343c64aa2983018f8c37ed9b8b203a9242b93 Mon Sep 17 00:00:00 2001 From: Gordon Watts Date: Tue, 22 Jul 2025 16:23:08 -0400 Subject: [PATCH 1/2] Add submitted transform caching --- docs/command_line.rst | 3 ++- servicex/app/cache.py | 10 ++++++++++ servicex/query_cache.py | 30 ++++++++++++++++++++++++++++++ servicex/query_core.py | 3 +-- tests/test_dataset.py | 6 +++--- tests/test_query_cache.py | 37 ++++++++++++++++++++++++------------- 6 files changed, 70 insertions(+), 19 deletions(-) diff --git a/docs/command_line.rst b/docs/command_line.rst index efc2019f..a55bb1a6 100644 --- a/docs/command_line.rst +++ b/docs/command_line.rst @@ -77,7 +77,8 @@ list ^^^^ Show all of the cached transforms along with the run time, code -generator, and number of resulting files +generator, and number of resulting files. Pending submissions are +also listed with the run date and file count shown as ``Pending``. delete ^^^^^^ diff --git a/servicex/app/cache.py b/servicex/app/cache.py index 76be0811..45e0fb17 100644 --- a/servicex/app/cache.py +++ b/servicex/app/cache.py @@ -62,6 +62,7 @@ def list(): table.add_column("Files") table.add_column("Format") runs = cache.cached_queries() + pending = cache.submitted_queries() for r in runs: table.add_row( r.title, @@ -71,6 +72,15 @@ def list(): str(r.files), r.result_format, ) + for r in pending: + table.add_row( + r.get("title", ""), + r.get("codegen", ""), + r.get("request_id", ""), + "Pending", + "Pending", + str(r.get("result_format", "")), + ) rich.print(table) diff --git a/servicex/query_cache.py b/servicex/query_cache.py index 441a503d..aecb1fd7 100644 --- a/servicex/query_cache.py +++ b/servicex/query_cache.py @@ -29,6 +29,7 @@ import os from pathlib import Path from typing import List, Optional +from datetime import datetime, timezone from filelock import FileLock from tinydb import TinyDB, Query, where @@ -148,6 +149,24 @@ def update_transform_request_id(self, hash_value: str, request_id: str) -> None: transform.hash == hash_value, ) + def cache_submitted_transform( + self, transform: TransformRequest, request_id: str + ) -> None: + """Cache a transform that has been submitted but not completed.""" + + record = { + "hash": transform.compute_hash(), + "title": transform.title, + "codegen": transform.codegen, + "result_format": transform.result_format, + "request_id": request_id, + "status": "SUBMITTED", + "submit_time": datetime.now(timezone.utc).isoformat(), + } + transforms = Query() + with self.lock: + self.db.upsert(record, transforms.hash == record["hash"]) + def get_transform_by_hash(self, hash: str) -> Optional[TransformedResults]: """ Returns completed transformations by hash @@ -203,6 +222,17 @@ def cached_queries(self) -> List[TransformedResults]: ] return result + def submitted_queries(self) -> List[dict]: + """Return all transform records that are only submitted.""" + transforms = Query() + with self.lock: + return [ + doc + for doc in self.db.search( + (transforms.status == "SUBMITTED") & transforms.request_id.exists() + ) + ] + def delete_record_by_request_id(self, request_id: str): with self.lock: self.db.remove(where("request_id") == request_id) diff --git a/servicex/query_core.py b/servicex/query_core.py index ad2341ef..266108aa 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -325,8 +325,7 @@ def transform_complete(task: Task): self.request_id = self.cache.get_transform_request_id(sx_request_hash) else: self.request_id = await self.servicex.submit_transform(sx_request) - self.cache.update_transform_request_id(sx_request_hash, self.request_id) - self.cache.update_transform_status(sx_request_hash, "SUBMITTED") + self.cache.cache_submitted_transform(sx_request, self.request_id) monitor_task = loop.create_task( self.transform_status_listener( diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 1bf2446f..067d8a0c 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -321,7 +321,7 @@ async def test_submit_and_download_cache_miss(python_dataset, completed_status): python_dataset.servicex.submit_transform = AsyncMock() python_dataset.download_files = AsyncMock() python_dataset.download_files.return_value = [] - python_dataset.cache.update_transform_request_id = Mock() + python_dataset.cache.cache_submitted_transform = Mock() signed_urls_only = False expandable_progress = ExpandableProgress() @@ -354,7 +354,7 @@ async def test_submit_and_download_cache_miss_overall_progress( python_dataset.servicex.submit_transform = AsyncMock() python_dataset.download_files = AsyncMock() python_dataset.download_files.return_value = [] - python_dataset.cache.update_transform_request_id = Mock() + python_dataset.cache.cache_submitted_transform = Mock() signed_urls_only = False expandable_progress = ExpandableProgress(overall_progress=True) @@ -423,7 +423,7 @@ async def test_submit_and_download_cache_miss_signed_urls_only( python_dataset.servicex.submit_transform = AsyncMock() python_dataset.download_files = AsyncMock() python_dataset.download_files.return_value = [] - python_dataset.cache.update_transform_request_id = Mock() + python_dataset.cache.cache_submitted_transform = Mock() signed_urls_only = True expandable_progress = ExpandableProgress() diff --git a/tests/test_query_cache.py b/tests/test_query_cache.py index 25c649ff..363cfa20 100644 --- a/tests/test_query_cache.py +++ b/tests/test_query_cache.py @@ -242,9 +242,8 @@ def test_get_transform_request_id(transform_request, completed_status): request_id = cache.get_transform_request_id(hash_value) print(request_id) - # update the transform request with a request id and then check for the request id - cache.update_transform_status(hash_value, "SUBMITTED") - cache.update_transform_request_id(hash_value, "123456") + # cache the submitted transform and then check for the request id + cache.cache_submitted_transform(transform_request, "123456") request_id = cache.get_transform_request_id(hash_value) assert request_id == "123456" @@ -276,18 +275,30 @@ def test_get_transform_request_status(transform_request, completed_status): assert cache.is_transform_request_submitted(hash_value) is False - # cache transform - cache.update_transform_status(hash_value, "SUBMITTED") - cache.cache_transform( - cache.transformed_results( - transform=transform_request, - completed_status=completed_status, - data_dir="/foo/bar", - file_list=file_uris, - signed_urls=[], - ) + # cache submitted transform + cache.cache_submitted_transform( + transform_request, "b8c508d0-ccf2-4deb-a1f7-65c839eebabf" ) assert cache.is_transform_request_submitted(hash_value) is True cache.close() + + +def test_cache_submitted_queries(transform_request): + with tempfile.TemporaryDirectory() as temp_dir: + config = Configuration(cache_path=temp_dir, api_endpoints=[]) # type: ignore + cache = QueryCache(config) + + cache.cache_submitted_transform(transform_request, "123456") + + pending = cache.submitted_queries() + assert len(pending) == 1 + assert pending[0]["status"] == "SUBMITTED" + assert pending[0]["request_id"] == "123456" + assert ( + cache.is_transform_request_submitted(transform_request.compute_hash()) + is True + ) + + cache.close() From 6c04ca441f44d01e8cfb888d41f50e15318c7caf Mon Sep 17 00:00:00 2001 From: Gordon Watts Date: Tue, 22 Jul 2025 16:23:08 -0400 Subject: [PATCH 2/2] Add submitted transform caching --- docs/command_line.rst | 3 ++- servicex/app/cache.py | 10 ++++++++++ servicex/query_cache.py | 30 ++++++++++++++++++++++++++++++ servicex/query_core.py | 3 +-- tests/test_dataset.py | 6 +++--- tests/test_query_cache.py | 37 ++++++++++++++++++++++++------------- 6 files changed, 70 insertions(+), 19 deletions(-) diff --git a/docs/command_line.rst b/docs/command_line.rst index efc2019f..a55bb1a6 100644 --- a/docs/command_line.rst +++ b/docs/command_line.rst @@ -77,7 +77,8 @@ list ^^^^ Show all of the cached transforms along with the run time, code -generator, and number of resulting files +generator, and number of resulting files. Pending submissions are +also listed with the run date and file count shown as ``Pending``. delete ^^^^^^ diff --git a/servicex/app/cache.py b/servicex/app/cache.py index 76be0811..45e0fb17 100644 --- a/servicex/app/cache.py +++ b/servicex/app/cache.py @@ -62,6 +62,7 @@ def list(): table.add_column("Files") table.add_column("Format") runs = cache.cached_queries() + pending = cache.submitted_queries() for r in runs: table.add_row( r.title, @@ -71,6 +72,15 @@ def list(): str(r.files), r.result_format, ) + for r in pending: + table.add_row( + r.get("title", ""), + r.get("codegen", ""), + r.get("request_id", ""), + "Pending", + "Pending", + str(r.get("result_format", "")), + ) rich.print(table) diff --git a/servicex/query_cache.py b/servicex/query_cache.py index 441a503d..aecb1fd7 100644 --- a/servicex/query_cache.py +++ b/servicex/query_cache.py @@ -29,6 +29,7 @@ import os from pathlib import Path from typing import List, Optional +from datetime import datetime, timezone from filelock import FileLock from tinydb import TinyDB, Query, where @@ -148,6 +149,24 @@ def update_transform_request_id(self, hash_value: str, request_id: str) -> None: transform.hash == hash_value, ) + def cache_submitted_transform( + self, transform: TransformRequest, request_id: str + ) -> None: + """Cache a transform that has been submitted but not completed.""" + + record = { + "hash": transform.compute_hash(), + "title": transform.title, + "codegen": transform.codegen, + "result_format": transform.result_format, + "request_id": request_id, + "status": "SUBMITTED", + "submit_time": datetime.now(timezone.utc).isoformat(), + } + transforms = Query() + with self.lock: + self.db.upsert(record, transforms.hash == record["hash"]) + def get_transform_by_hash(self, hash: str) -> Optional[TransformedResults]: """ Returns completed transformations by hash @@ -203,6 +222,17 @@ def cached_queries(self) -> List[TransformedResults]: ] return result + def submitted_queries(self) -> List[dict]: + """Return all transform records that are only submitted.""" + transforms = Query() + with self.lock: + return [ + doc + for doc in self.db.search( + (transforms.status == "SUBMITTED") & transforms.request_id.exists() + ) + ] + def delete_record_by_request_id(self, request_id: str): with self.lock: self.db.remove(where("request_id") == request_id) diff --git a/servicex/query_core.py b/servicex/query_core.py index ad2341ef..266108aa 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -325,8 +325,7 @@ def transform_complete(task: Task): self.request_id = self.cache.get_transform_request_id(sx_request_hash) else: self.request_id = await self.servicex.submit_transform(sx_request) - self.cache.update_transform_request_id(sx_request_hash, self.request_id) - self.cache.update_transform_status(sx_request_hash, "SUBMITTED") + self.cache.cache_submitted_transform(sx_request, self.request_id) monitor_task = loop.create_task( self.transform_status_listener( diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 1bf2446f..067d8a0c 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -321,7 +321,7 @@ async def test_submit_and_download_cache_miss(python_dataset, completed_status): python_dataset.servicex.submit_transform = AsyncMock() python_dataset.download_files = AsyncMock() python_dataset.download_files.return_value = [] - python_dataset.cache.update_transform_request_id = Mock() + python_dataset.cache.cache_submitted_transform = Mock() signed_urls_only = False expandable_progress = ExpandableProgress() @@ -354,7 +354,7 @@ async def test_submit_and_download_cache_miss_overall_progress( python_dataset.servicex.submit_transform = AsyncMock() python_dataset.download_files = AsyncMock() python_dataset.download_files.return_value = [] - python_dataset.cache.update_transform_request_id = Mock() + python_dataset.cache.cache_submitted_transform = Mock() signed_urls_only = False expandable_progress = ExpandableProgress(overall_progress=True) @@ -423,7 +423,7 @@ async def test_submit_and_download_cache_miss_signed_urls_only( python_dataset.servicex.submit_transform = AsyncMock() python_dataset.download_files = AsyncMock() python_dataset.download_files.return_value = [] - python_dataset.cache.update_transform_request_id = Mock() + python_dataset.cache.cache_submitted_transform = Mock() signed_urls_only = True expandable_progress = ExpandableProgress() diff --git a/tests/test_query_cache.py b/tests/test_query_cache.py index 25c649ff..363cfa20 100644 --- a/tests/test_query_cache.py +++ b/tests/test_query_cache.py @@ -242,9 +242,8 @@ def test_get_transform_request_id(transform_request, completed_status): request_id = cache.get_transform_request_id(hash_value) print(request_id) - # update the transform request with a request id and then check for the request id - cache.update_transform_status(hash_value, "SUBMITTED") - cache.update_transform_request_id(hash_value, "123456") + # cache the submitted transform and then check for the request id + cache.cache_submitted_transform(transform_request, "123456") request_id = cache.get_transform_request_id(hash_value) assert request_id == "123456" @@ -276,18 +275,30 @@ def test_get_transform_request_status(transform_request, completed_status): assert cache.is_transform_request_submitted(hash_value) is False - # cache transform - cache.update_transform_status(hash_value, "SUBMITTED") - cache.cache_transform( - cache.transformed_results( - transform=transform_request, - completed_status=completed_status, - data_dir="/foo/bar", - file_list=file_uris, - signed_urls=[], - ) + # cache submitted transform + cache.cache_submitted_transform( + transform_request, "b8c508d0-ccf2-4deb-a1f7-65c839eebabf" ) assert cache.is_transform_request_submitted(hash_value) is True cache.close() + + +def test_cache_submitted_queries(transform_request): + with tempfile.TemporaryDirectory() as temp_dir: + config = Configuration(cache_path=temp_dir, api_endpoints=[]) # type: ignore + cache = QueryCache(config) + + cache.cache_submitted_transform(transform_request, "123456") + + pending = cache.submitted_queries() + assert len(pending) == 1 + assert pending[0]["status"] == "SUBMITTED" + assert pending[0]["request_id"] == "123456" + assert ( + cache.is_transform_request_submitted(transform_request.compute_hash()) + is True + ) + + cache.close()