From f98f2dffa3f7e515870597c642861751a0e2c6bd Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Tue, 21 May 2024 17:56:24 +1200 Subject: [PATCH 1/4] sync data-level of subscription --- cylc/uiserver/data_store_mgr.py | 133 ++++++++++++++++++++++++++------ 1 file changed, 109 insertions(+), 24 deletions(-) diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py index b7e526a0..fee55548 100644 --- a/cylc/uiserver/data_store_mgr.py +++ b/cylc/uiserver/data_store_mgr.py @@ -101,7 +101,22 @@ def __init__(self, workflows_mgr, log, max_threads=10): self.log = log self.data = {} self.w_subs: Dict[str, WorkflowSubscriber] = {} - self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'} + self.topics = { + ALL_DELTAS.encode('utf-8'), + WORKFLOW.encode('utf-8'), + b'shutdown' + } + # If fragments in graphql sub for minimal sync + self.min_sync_fragments = { + 'AddedDelta', + 'WorkflowData', + 'UpdatedDelta' + } + # set of workflows to sync all data + self.full_sync_workflows = set() + self.full_sync_gql_subs = set() + # dict of workflow full sync subscriber IDs + self.full_sync_workflow_gql_subs = {} self.loop = None self.executor = ThreadPoolExecutor(max_threads) self.delta_queues = {} @@ -161,6 +176,9 @@ async def connect_workflow(self, w_id, contact_data): self.delta_queues[w_id] = {} + # setup sync subscriber set + self.full_sync_workflow_gql_subs[w_id] = set() + # Might be options other than threads to achieve # non-blocking subscriptions, but this works. self.executor.submit( @@ -170,7 +188,25 @@ async def connect_workflow(self, w_id, contact_data): contact_data[CFF.HOST], contact_data[CFF.PUBLISH_PORT] ) - successful_updates = await self._entire_workflow_update(ids=[w_id]) + + result = await self.workflow_data_update(w_id, minimal=True) + + if result: + # don't update the contact data until we have successfully updated + self._update_contact(w_id, contact_data) + + @log_call + async def workflow_data_update( + self, + w_id: str, + minimal: Optional[bool] = None + ): + if minimal is None: + minimal = w_id in self.full_sync_workflows + successful_updates = await self._entire_workflow_update( + ids=[w_id], + minimal=minimal + ) if w_id not in successful_updates: # something went wrong, undo any changes to allow for subsequent @@ -178,9 +214,7 @@ async def connect_workflow(self, w_id, contact_data): self.log.info(f'failed to connect to {w_id}') self.disconnect_workflow(w_id) return False - else: - # don't update the contact data until we have successfully updated - self._update_contact(w_id, contact_data) + return True @log_call def disconnect_workflow(self, w_id, update_contact=True): @@ -207,6 +241,9 @@ def disconnect_workflow(self, w_id, update_contact=True): if w_id in self.w_subs: self.w_subs[w_id].stop() del self.w_subs[w_id] + if w_id in self.full_sync_workflow_gql_subs: + del self.full_sync_workflow_gql_subs[w_id] + self.full_sync_workflows.discard(w_id) def get_workflows(self): """Return all workflows the data store is currently tracking. @@ -283,8 +320,18 @@ def _update_workflow_data(self, topic, delta, w_id): # close connections self.disconnect_workflow(w_id) return - self._apply_all_delta(w_id, delta) - self._delta_store_to_queues(w_id, topic, delta) + elif topic == WORKFLOW: + if w_id in self.full_sync_workflows: + return + self._apply_delta(w_id, WORKFLOW, delta) + # might seem clunky, but as with contact update, making it look + # like an ALL_DELTA avoids changing the resolver in cylc-flow + all_deltas = DELTAS_MAP[ALL_DELTAS]() + all_deltas.workflow.CopyFrom(delta) + self._delta_store_to_queues(w_id, ALL_DELTAS, all_deltas) + elif w_id in self.full_sync_workflows: + self._apply_all_delta(w_id, delta) + self._delta_store_to_queues(w_id, topic, delta) def _clear_data_field(self, w_id, field_name): if field_name == WORKFLOW: @@ -295,22 +342,26 @@ def _clear_data_field(self, w_id, field_name): def _apply_all_delta(self, w_id, delta): """Apply the AllDeltas delta.""" for field, sub_delta in delta.ListFields(): - delta_time = getattr(sub_delta, 'time', 0.0) - # If the workflow has reloaded clear the data before - # delta application. - if sub_delta.reloaded: - self._clear_data_field(w_id, field.name) - self.data[w_id]['delta_times'][field.name] = 0.0 - # hard to catch errors in a threaded async app, so use try-except. - try: - # Apply the delta if newer than the previously applied. - if delta_time >= self.data[w_id]['delta_times'][field.name]: - apply_delta(field.name, sub_delta, self.data[w_id]) - self.data[w_id]['delta_times'][field.name] = delta_time - if not sub_delta.reloaded: - self._reconcile_update(field.name, sub_delta, w_id) - except Exception as exc: - self.log.exception(exc) + self._apply_delta(w_id, field.name, sub_delta) + + def _apply_delta(self, w_id, name, delta): + """Apply delta.""" + delta_time = getattr(delta, 'time', 0.0) + # If the workflow has reloaded clear the data before + # delta application. + if delta.reloaded: + self._clear_data_field(w_id, name) + self.data[w_id]['delta_times'][name] = 0.0 + # hard to catch errors in a threaded async app, so use try-except. + try: + # Apply the delta if newer than the previously applied. + if delta_time >= self.data[w_id]['delta_times'][name]: + apply_delta(name, delta, self.data[w_id]) + self.data[w_id]['delta_times'][name] = delta_time + if not delta.reloaded: + self._reconcile_update(name, delta, w_id) + except Exception as exc: + self.log.exception(exc) def _delta_store_to_queues(self, w_id, topic, delta): # Queue delta for graphql subscription resolving @@ -369,7 +420,9 @@ def _reconcile_update(self, topic, delta, w_id): self.log.exception(exc) async def _entire_workflow_update( - self, ids: Optional[list] = None + self, + ids: Optional[list] = None, + minimal: Optional[bool] = False ) -> Set[str]: """Update entire local data-store of workflow(s). @@ -414,6 +467,8 @@ async def _entire_workflow_update( for key in DATA_TEMPLATE } continue + elif minimal: + continue new_data[field.name] = {n.id: n for n in value} self.data[w_id] = new_data successes.add(w_id) @@ -502,3 +557,33 @@ def _get_status_msg(self, w_id: str, is_active: bool) -> str: else: # the workflow has not yet run return 'not yet run' + + def graphql_sub_interrogate(self, sub_id, info): + """Scope data requirements.""" + fragments = set(info.fragments.keys()) + minimal = ( + fragments <= self.min_sync_fragments + and bool(fragments) + ) + if not minimal: + self.full_sync_gql_subs.add(sub_id) + return minimal + + async def graphql_sub_data_match(self, w_id, sub_id): + """Match store data level to requested graphql subscription.""" + if ( + sub_id in self.full_sync_gql_subs + and sub_id not in self.full_sync_workflow_gql_subs[w_id] + ): + self.full_sync_workflow_gql_subs[w_id].add(sub_id) + await self.workflow_data_update(w_id, minimal=False) + + self.full_sync_workflows.add(w_id) + + def graphql_sub_discard(self, sub_id): + """Discard graphql subscription references.""" + self.full_sync_gql_subs.discard(sub_id) + for w_id in self.full_sync_workflow_gql_subs: + self.full_sync_workflow_gql_subs[w_id].discard(w_id) + if not self.full_sync_workflow_gql_subs[w_id]: + self.full_sync_workflows.discard(w_id) From 2d810c78d1bc013b68c7ee1c8607a26c3084d4ff Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Thu, 30 May 2024 18:46:26 +1200 Subject: [PATCH 2/4] define min/max, request workflow only --- cylc/uiserver/data_store_mgr.py | 165 ++++++++++++++++++++------------ 1 file changed, 103 insertions(+), 62 deletions(-) diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py index fee55548..5d333242 100644 --- a/cylc/uiserver/data_store_mgr.py +++ b/cylc/uiserver/data_store_mgr.py @@ -36,7 +36,7 @@ from copy import deepcopy from pathlib import Path import time -from typing import Dict, Optional, Set +from typing import Dict, List, Set from cylc.flow.exceptions import WorkflowStopped from cylc.flow.id import Tokens @@ -56,6 +56,27 @@ from .utils import fmt_call from .workflows_mgr import workflow_request +MIN_LEVEL = 'min' +MAX_LEVEL = 'max' +SUBSCRIPTION_LEVELS = { + MIN_LEVEL: { + 'topics': {WORKFLOW.encode('utf-8'), b'shutdown'}, + 'criteria': { + 'fragments': { + 'AddedDelta', + 'WorkflowData', + 'UpdatedDelta' + }, + }, + 'request': 'pb_workflow_only', + }, + MAX_LEVEL: { + 'topics': {ALL_DELTAS.encode('utf-8'), b'shutdown'}, + 'criteria': {'fragments': set()}, + 'request': 'pb_entire_workflow', + }, +} + def log_call(fcn): """Decorator for data store methods we want to log.""" @@ -101,22 +122,13 @@ def __init__(self, workflows_mgr, log, max_threads=10): self.log = log self.data = {} self.w_subs: Dict[str, WorkflowSubscriber] = {} - self.topics = { - ALL_DELTAS.encode('utf-8'), - WORKFLOW.encode('utf-8'), - b'shutdown' - } - # If fragments in graphql sub for minimal sync - self.min_sync_fragments = { - 'AddedDelta', - 'WorkflowData', - 'UpdatedDelta' + # graphql subscription level + self.sync_level_graphql_subs = { + MIN_LEVEL: set(), + MAX_LEVEL: set() } - # set of workflows to sync all data - self.full_sync_workflows = set() - self.full_sync_gql_subs = set() - # dict of workflow full sync subscriber IDs - self.full_sync_workflow_gql_subs = {} + # workflow graphql subscription by level + self.sync_level_workflow_graphql_subs = {} self.loop = None self.executor = ThreadPoolExecutor(max_threads) self.delta_queues = {} @@ -141,6 +153,12 @@ async def register_workflow(self, w_id: str, is_active: bool) -> None: status_msg=self._get_status_msg(w_id, is_active), ) + # setup sync subscriber set + self.sync_level_workflow_graphql_subs[w_id] = { + MIN_LEVEL: set(), + MAX_LEVEL: set() + } + @log_call async def unregister_workflow(self, w_id): """Remove a workflow from the data store entirely. @@ -176,8 +194,9 @@ async def connect_workflow(self, w_id, contact_data): self.delta_queues[w_id] = {} - # setup sync subscriber set - self.full_sync_workflow_gql_subs[w_id] = set() + level = MIN_LEVEL + if self.sync_level_workflow_graphql_subs[w_id][MAX_LEVEL]: + level = MAX_LEVEL # Might be options other than threads to achieve # non-blocking subscriptions, but this works. @@ -186,10 +205,11 @@ async def connect_workflow(self, w_id, contact_data): w_id, contact_data['name'], contact_data[CFF.HOST], - contact_data[CFF.PUBLISH_PORT] + contact_data[CFF.PUBLISH_PORT], + SUBSCRIPTION_LEVELS[level]['topics'] ) - result = await self.workflow_data_update(w_id, minimal=True) + result = await self.workflow_data_update(w_id, level) if result: # don't update the contact data until we have successfully updated @@ -199,13 +219,12 @@ async def connect_workflow(self, w_id, contact_data): async def workflow_data_update( self, w_id: str, - minimal: Optional[bool] = None + level: str, ): - if minimal is None: - minimal = w_id in self.full_sync_workflows - successful_updates = await self._entire_workflow_update( - ids=[w_id], - minimal=minimal + # for some reason mypy doesn't like non-fstring... + successful_updates = await self._workflow_update( + [w_id], + f'{SUBSCRIPTION_LEVELS[level]["request"]}' ) if w_id not in successful_updates: @@ -241,9 +260,6 @@ def disconnect_workflow(self, w_id, update_contact=True): if w_id in self.w_subs: self.w_subs[w_id].stop() del self.w_subs[w_id] - if w_id in self.full_sync_workflow_gql_subs: - del self.full_sync_workflow_gql_subs[w_id] - self.full_sync_workflows.discard(w_id) def get_workflows(self): """Return all workflows the data store is currently tracking. @@ -273,8 +289,10 @@ def _purge_workflow(self, w_id): del self.data[w_id] if w_id in self.delta_queues: del self.delta_queues[w_id] + if w_id in self.sync_level_workflow_graphql_subs: + del self.sync_level_workflow_graphql_subs[w_id] - def _start_subscription(self, w_id, reg, host, port): + def _start_subscription(self, w_id, reg, host, port, topics): """Instantiate and run subscriber data-store sync. Args: @@ -282,6 +300,7 @@ def _start_subscription(self, w_id, reg, host, port): reg (str): Registered workflow name. host (str): Hostname of target workflow. port (int): Port of target workflow. + topics set(str): set of topics to subscribe to. """ self.w_subs[w_id] = WorkflowSubscriber( @@ -289,7 +308,7 @@ def _start_subscription(self, w_id, reg, host, port): host=host, port=port, context=self.workflows_mgr.context, - topics=self.topics + topics=topics ) self.w_subs[w_id].loop.run_until_complete( self.w_subs[w_id].subscribe( @@ -321,7 +340,7 @@ def _update_workflow_data(self, topic, delta, w_id): self.disconnect_workflow(w_id) return elif topic == WORKFLOW: - if w_id in self.full_sync_workflows: + if self.sync_level_workflow_graphql_subs[w_id][MAX_LEVEL]: return self._apply_delta(w_id, WORKFLOW, delta) # might seem clunky, but as with contact update, making it look @@ -329,7 +348,7 @@ def _update_workflow_data(self, topic, delta, w_id): all_deltas = DELTAS_MAP[ALL_DELTAS]() all_deltas.workflow.CopyFrom(delta) self._delta_store_to_queues(w_id, ALL_DELTAS, all_deltas) - elif w_id in self.full_sync_workflows: + else: self._apply_all_delta(w_id, delta) self._delta_store_to_queues(w_id, topic, delta) @@ -419,10 +438,8 @@ def _reconcile_update(self, topic, delta, w_id): except Exception as exc: self.log.exception(exc) - async def _entire_workflow_update( - self, - ids: Optional[list] = None, - minimal: Optional[bool] = False + async def _workflow_update( + self, ids: List[str], req_method: str, ) -> Set[str]: """Update entire local data-store of workflow(s). @@ -430,11 +447,6 @@ async def _entire_workflow_update( ids: List of workflow external IDs. """ - if ids is None: - ids = [] - - # Request new data - req_method = 'pb_entire_workflow' requests = { w_id: workflow_request( @@ -467,8 +479,6 @@ async def _entire_workflow_update( for key in DATA_TEMPLATE } continue - elif minimal: - continue new_data[field.name] = {n.id: n for n in value} self.data[w_id] = new_data successes.add(w_id) @@ -558,32 +568,63 @@ def _get_status_msg(self, w_id: str, is_active: bool) -> str: # the workflow has not yet run return 'not yet run' + async def _update_subscription_level(self, w_id, level): + """Update level of data subscribed to.""" + sub = self.w_subs.get(w_id) + if sub: + stop_topics = sub.topics.difference( + SUBSCRIPTION_LEVELS[level]['topics'] + ) + start_topics = SUBSCRIPTION_LEVELS[level]['topics'].difference( + sub.topics + ) + for stop_topic in stop_topics: + sub.unsubscribe_topic(stop_topic) + # Doing this after unsubscribe and before subscribe + # to make sure old topics stop and new data is in place. + await self.workflow_data_update(w_id, level) + for start_topic in start_topics: + sub.subscribe_topic(start_topic) + def graphql_sub_interrogate(self, sub_id, info): """Scope data requirements.""" fragments = set(info.fragments.keys()) minimal = ( - fragments <= self.min_sync_fragments + ( + fragments + <= SUBSCRIPTION_LEVELS[MIN_LEVEL]['criteria']['fragments'] + ) and bool(fragments) ) - if not minimal: - self.full_sync_gql_subs.add(sub_id) - return minimal + if minimal: + self.sync_level_graphql_subs[MIN_LEVEL].add(sub_id) + return + self.sync_level_graphql_subs[MAX_LEVEL].add(sub_id) async def graphql_sub_data_match(self, w_id, sub_id): """Match store data level to requested graphql subscription.""" - if ( - sub_id in self.full_sync_gql_subs - and sub_id not in self.full_sync_workflow_gql_subs[w_id] - ): - self.full_sync_workflow_gql_subs[w_id].add(sub_id) - await self.workflow_data_update(w_id, minimal=False) - - self.full_sync_workflows.add(w_id) + sync_level_wsubs = self.sync_level_workflow_graphql_subs[w_id] + if sub_id in self.sync_level_graphql_subs[MAX_LEVEL]: + if not sync_level_wsubs[MAX_LEVEL]: + sync_level_wsubs[MAX_LEVEL].add(sub_id) + await self._update_subscription_level(w_id, MAX_LEVEL) + else: + sync_level_wsubs[MIN_LEVEL].add(sub_id) - def graphql_sub_discard(self, sub_id): + async def graphql_sub_discard(self, sub_id): """Discard graphql subscription references.""" - self.full_sync_gql_subs.discard(sub_id) - for w_id in self.full_sync_workflow_gql_subs: - self.full_sync_workflow_gql_subs[w_id].discard(w_id) - if not self.full_sync_workflow_gql_subs[w_id]: - self.full_sync_workflows.discard(w_id) + level = MIN_LEVEL + if sub_id in self.sync_level_graphql_subs[MAX_LEVEL]: + level = MAX_LEVEL + self.sync_level_graphql_subs[level].discard(sub_id) + for w_id in self.sync_level_workflow_graphql_subs: + self.sync_level_workflow_graphql_subs[w_id][level].discard( + sub_id + ) + # if there are no more max level subscriptions after removal + # of a max level sub, downgrade to min. + if ( + not self.sync_level_workflow_graphql_subs[w_id][level] + and level is MAX_LEVEL + ): + await self._update_subscription_level(w_id, MIN_LEVEL) From 2605026328a6168664c6a32d27131c1652bc12af Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Thu, 30 May 2024 20:13:45 +1200 Subject: [PATCH 3/4] test fixes --- cylc/uiserver/tests/test_data_store_mgr.py | 49 +++++++++++++++------- 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/cylc/uiserver/tests/test_data_store_mgr.py b/cylc/uiserver/tests/test_data_store_mgr.py index ad95cd5a..fbdc34b9 100644 --- a/cylc/uiserver/tests/test_data_store_mgr.py +++ b/cylc/uiserver/tests/test_data_store_mgr.py @@ -25,17 +25,21 @@ from cylc.flow.network import ZMQSocketBase from cylc.flow.workflow_files import ContactFileFields as CFF -from cylc.uiserver.data_store_mgr import DataStoreMgr +from cylc.uiserver.data_store_mgr import ( + DataStoreMgr, + MAX_LEVEL, + SUBSCRIPTION_LEVELS +) from .conftest import AsyncClientFixture -async def test_entire_workflow_update( +async def test_workflow_update( async_client: AsyncClientFixture, data_store_mgr: DataStoreMgr, make_entire_workflow ): - """Test that ``entire_workflow_update`` is executed successfully.""" + """Test that ``_workflow_update`` is executed successfully.""" w_id = 'workflow_id' entire_workflow = make_entire_workflow(f'{w_id}') async_client.will_return(entire_workflow.SerializeToString()) @@ -45,10 +49,13 @@ async def test_entire_workflow_update( 'req_client': async_client } - # Call the entire_workflow_update function. + # Call the _workflow_update function. # This should use the client defined above (``async_client``) when # calling ``workflow_request``. - await data_store_mgr._entire_workflow_update() + await data_store_mgr._workflow_update( + [w_id], + SUBSCRIPTION_LEVELS[MAX_LEVEL]["request"] + ) # The ``DataStoreMgr`` sets the workflow data retrieved in its # own ``.data`` dictionary, which will contain Protobuf message @@ -61,12 +68,12 @@ async def test_entire_workflow_update( assert entire_workflow.workflow.id == w_id_data['workflow'].id -async def test_entire_workflow_update_ignores_timeout_message( +async def test_workflow_update_ignores_timeout_message( async_client: AsyncClientFixture, data_store_mgr: DataStoreMgr ): """ - Test that ``entire_workflow_update`` ignores if the client + Test that ``_workflow_update`` ignores if the client receives a ``MSG_TIMEOUT`` message. """ w_id = 'workflow_id' @@ -77,10 +84,13 @@ async def test_entire_workflow_update_ignores_timeout_message( 'req_client': async_client } - # Call the entire_workflow_update function. + # Call the _workflow_update function. # This should use the client defined above (``async_client``) when # calling ``workflow_request``. - await data_store_mgr._entire_workflow_update() + await data_store_mgr._workflow_update( + [w_id], + SUBSCRIPTION_LEVELS[MAX_LEVEL]["request"] + ) # When a ClientTimeout happens, the ``DataStoreMgr`` object ignores # that message. So it means that its ``.data`` dictionary MUST NOT @@ -88,13 +98,13 @@ async def test_entire_workflow_update_ignores_timeout_message( assert w_id not in data_store_mgr.data -async def test_entire_workflow_update_gather_error( +async def test_workflow_update_gather_error( async_client: AsyncClientFixture, data_store_mgr: DataStoreMgr, caplog: pytest.LogCaptureFixture, ): """ - Test that if ``asyncio.gather`` in ``entire_workflow_update`` + Test that if ``asyncio.gather`` in ``_workflow_update`` has a coroutine raising an error, it will handle the error correctly. """ # The ``AsyncClient`` will raise an error. This will happen when @@ -110,10 +120,13 @@ async def test_entire_workflow_update_gather_error( 'req_client': async_client } - # Call the entire_workflow_update function. + # Call the _workflow_update function. # This should use the client defined above (``async_client``) when # calling ``workflow_request``. - await data_store_mgr._entire_workflow_update() + await data_store_mgr._workflow_update( + ['workflow_id'], + SUBSCRIPTION_LEVELS[MAX_LEVEL]["request"] + ) assert caplog.record_tuples == [ ('cylc', 40, 'Error communicating with myflow'), ('cylc', 40, 'x'), @@ -124,19 +137,22 @@ async def test_entire_workflow_update_gather_error( assert exc_info and exc_info[0] == ValueError -async def test_entire_workflow_update__stopped_workflow( +async def test_workflow_update__stopped_workflow( async_client: AsyncClientFixture, data_store_mgr: DataStoreMgr, caplog: pytest.LogCaptureFixture, ): - """Test that DataStoreMgr._entire_workflow_update() handles a stopped + """Test that DataStoreMgr._workflow_update() handles a stopped workflow reasonably.""" exc = WorkflowStopped('myflow') async_client.will_return(exc) data_store_mgr.workflows_mgr.workflows['workflow_id'] = { 'req_client': async_client } - await data_store_mgr._entire_workflow_update() + await data_store_mgr._workflow_update( + ['workflow_id'], + SUBSCRIPTION_LEVELS[MAX_LEVEL]["request"] + ) assert caplog.record_tuples == [ ('cylc', 40, f'WorkflowStopped: {exc}'), ] @@ -274,6 +290,7 @@ async def test_workflow_connect_fail( # WorkflowRuntimeServer with the correct endpoints and auth assert [record.message for record in caplog.records] == [ "[data-store] connect_workflow('~user/workflow_id', )", + "[data-store] workflow_data_update('~user/workflow_id', 'min')", 'failed to connect to ~user/workflow_id', "[data-store] disconnect_workflow('~user/workflow_id')", ] From b2c0b6e3f0b3df35e8b5b86827b28bdec149b53a Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Wed, 12 Jun 2024 12:01:17 +1200 Subject: [PATCH 4/4] query sync level and expiry --- cylc/uiserver/app.py | 6 +++ cylc/uiserver/data_store_mgr.py | 78 +++++++++++++++++++++++++++------ cylc/uiserver/resolvers.py | 11 ++--- 3 files changed, 76 insertions(+), 19 deletions(-) diff --git a/cylc/uiserver/app.py b/cylc/uiserver/app.py index 8e9fb00c..40f42919 100644 --- a/cylc/uiserver/app.py +++ b/cylc/uiserver/app.py @@ -460,6 +460,12 @@ def initialize_settings(self): self.scan_interval * 1000 ).start() + # configure the sync level expiry check + ioloop.PeriodicCallback( + self.data_store_mgr.check_query_sync_level_expiries, + self.data_store_mgr.SYNC_LEVEL_TIMER_INTERVAL * 1000 + ).start() + def initialize_handlers(self): self.authobj = self.set_auth() self.set_sub_server() diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py index 5d333242..0c790499 100644 --- a/cylc/uiserver/data_store_mgr.py +++ b/cylc/uiserver/data_store_mgr.py @@ -36,7 +36,7 @@ from copy import deepcopy from pathlib import Path import time -from typing import Dict, List, Set +from typing import Dict, Iterable, List, Optional, Set from cylc.flow.exceptions import WorkflowStopped from cylc.flow.id import Tokens @@ -76,6 +76,8 @@ 'request': 'pb_entire_workflow', }, } +# expiry interval post query +QUERY_SYNC_EXPIRY = 60 def log_call(fcn): @@ -116,6 +118,7 @@ class DataStoreMgr: INIT_DATA_RETRY_DELAY = 0.5 # seconds RECONCILE_TIMEOUT = 5. # seconds PENDING_DELTA_CHECK_INTERVAL = 0.5 + SYNC_LEVEL_TIMER_INTERVAL = 30 def __init__(self, workflows_mgr, log, max_threads=10): self.workflows_mgr = workflows_mgr @@ -128,7 +131,11 @@ def __init__(self, workflows_mgr, log, max_threads=10): MAX_LEVEL: set() } # workflow graphql subscription by level - self.sync_level_workflow_graphql_subs = {} + self.workflow_sync_level_graphql_subs = {} + # workflow graphql query timers + self.workflow_query_sync_timers = {} + # resultant workflow sync level + self.workflow_sync_level = {} self.loop = None self.executor = ThreadPoolExecutor(max_threads) self.delta_queues = {} @@ -153,12 +160,18 @@ async def register_workflow(self, w_id: str, is_active: bool) -> None: status_msg=self._get_status_msg(w_id, is_active), ) - # setup sync subscriber set - self.sync_level_workflow_graphql_subs[w_id] = { + # setup sync subscriber level sets + self.workflow_sync_level_graphql_subs[w_id] = { MIN_LEVEL: set(), MAX_LEVEL: set() } + # set query sync timer + self.workflow_query_sync_timers[w_id] = 0.0 + + # set workflow sync level + self.workflow_sync_level[w_id] = MIN_LEVEL + @log_call async def unregister_workflow(self, w_id): """Remove a workflow from the data store entirely. @@ -195,7 +208,7 @@ async def connect_workflow(self, w_id, contact_data): self.delta_queues[w_id] = {} level = MIN_LEVEL - if self.sync_level_workflow_graphql_subs[w_id][MAX_LEVEL]: + if self.workflow_sync_level_graphql_subs[w_id][MAX_LEVEL]: level = MAX_LEVEL # Might be options other than threads to achieve @@ -289,8 +302,12 @@ def _purge_workflow(self, w_id): del self.data[w_id] if w_id in self.delta_queues: del self.delta_queues[w_id] - if w_id in self.sync_level_workflow_graphql_subs: - del self.sync_level_workflow_graphql_subs[w_id] + if w_id in self.workflow_sync_level_graphql_subs: + del self.workflow_sync_level_graphql_subs[w_id] + if w_id in self.workflow_query_sync_timers: + del self.workflow_query_sync_timers[w_id] + if w_id in self.workflow_sync_level: + del self.workflow_sync_level[w_id] def _start_subscription(self, w_id, reg, host, port, topics): """Instantiate and run subscriber data-store sync. @@ -340,7 +357,7 @@ def _update_workflow_data(self, topic, delta, w_id): self.disconnect_workflow(w_id) return elif topic == WORKFLOW: - if self.sync_level_workflow_graphql_subs[w_id][MAX_LEVEL]: + if self.workflow_sync_level_graphql_subs[w_id][MAX_LEVEL]: return self._apply_delta(w_id, WORKFLOW, delta) # might seem clunky, but as with contact update, making it look @@ -585,6 +602,7 @@ async def _update_subscription_level(self, w_id, level): await self.workflow_data_update(w_id, level) for start_topic in start_topics: sub.subscribe_topic(start_topic) + self.workflow_sync_level[w_id] = level def graphql_sub_interrogate(self, sub_id, info): """Scope data requirements.""" @@ -603,10 +621,11 @@ def graphql_sub_interrogate(self, sub_id, info): async def graphql_sub_data_match(self, w_id, sub_id): """Match store data level to requested graphql subscription.""" - sync_level_wsubs = self.sync_level_workflow_graphql_subs[w_id] + sync_level_wsubs = self.workflow_sync_level_graphql_subs[w_id] if sub_id in self.sync_level_graphql_subs[MAX_LEVEL]: - if not sync_level_wsubs[MAX_LEVEL]: - sync_level_wsubs[MAX_LEVEL].add(sub_id) + no_max = not sync_level_wsubs[MAX_LEVEL] + sync_level_wsubs[MAX_LEVEL].add(sub_id) + if no_max: await self._update_subscription_level(w_id, MAX_LEVEL) else: sync_level_wsubs[MIN_LEVEL].add(sub_id) @@ -617,14 +636,45 @@ async def graphql_sub_discard(self, sub_id): if sub_id in self.sync_level_graphql_subs[MAX_LEVEL]: level = MAX_LEVEL self.sync_level_graphql_subs[level].discard(sub_id) - for w_id in self.sync_level_workflow_graphql_subs: - self.sync_level_workflow_graphql_subs[w_id][level].discard( + for w_id in self.workflow_sync_level_graphql_subs: + self.workflow_sync_level_graphql_subs[w_id][level].discard( sub_id ) # if there are no more max level subscriptions after removal # of a max level sub, downgrade to min. if ( - not self.sync_level_workflow_graphql_subs[w_id][level] + not self.workflow_sync_level_graphql_subs[w_id][level] and level is MAX_LEVEL + and self.workflow_query_sync_timers[w_id] < time.time() + ): + await self._update_subscription_level(w_id, MIN_LEVEL) + + async def set_query_sync_levels( + self, + w_ids: Iterable[str], + level: Optional[str] = None, + expire_delay: Optional[float] = None, + ): + """Set a workflow sync level.""" + if level is None: + level = MAX_LEVEL + if expire_delay is None: + expire_delay = QUERY_SYNC_EXPIRY + expire_time = time.time() + expire_delay + for w_id in w_ids: + self.workflow_query_sync_timers[w_id] = expire_time + if self.workflow_sync_level[w_id] is level: + # Already required level + continue + await self._update_subscription_level(w_id, level) + + async def check_query_sync_level_expiries(self): + """Check for and downgrade expired sub levels.""" + for w_id, expiry in self.workflow_query_sync_timers.items(): + if ( + w_id in self.w_subs + and self.workflow_sync_level[w_id] is not MIN_LEVEL + and not self.workflow_sync_level_graphql_subs[w_id][MAX_LEVEL] + and expiry < time.time() ): await self._update_subscription_level(w_id, MIN_LEVEL) diff --git a/cylc/uiserver/resolvers.py b/cylc/uiserver/resolvers.py index db3e357f..69318b54 100644 --- a/cylc/uiserver/resolvers.py +++ b/cylc/uiserver/resolvers.py @@ -45,7 +45,7 @@ WorkflowFilesError, ) from cylc.flow.id import Tokens -from cylc.flow.network.resolvers import BaseResolvers +from cylc.flow.network.resolvers import BaseResolvers, workflow_filter from cylc.flow.scripts.clean import CleanOptions from cylc.flow.scripts.clean import run @@ -497,11 +497,12 @@ async def mutator( ) } w_ids = [ - flow[WORKFLOW].id - for flow in await self.get_workflows_data(w_args)] + workflow[WORKFLOW].id + for workflow in self.data_store_mgr.data.values() + if workflow_filter(workflow, w_args) + ] if not w_ids: - return [{ - 'response': (False, 'No matching workflows')}] + return [{'response': (False, 'No matching workflows')}] # Pass the request to the workflow GraphQL endpoints _, variables, _, _ = info.context.get( # type: ignore[union-attr] 'graphql_params'