diff --git a/changes.d/675.feat.md b/changes.d/675.feat.md new file mode 100644 index 00000000..9a40cc43 --- /dev/null +++ b/changes.d/675.feat.md @@ -0,0 +1 @@ +Adding CPU time and Max RSS to Analysis Tools diff --git a/cylc/uiserver/schema.py b/cylc/uiserver/schema.py index 364f9d89..1a0b1ffa 100644 --- a/cylc/uiserver/schema.py +++ b/cylc/uiserver/schema.py @@ -139,8 +139,8 @@ class Arguments: ) start_cycle_point = CyclePoint( description=sstrip(''' - Set the start cycle point, which may be after the initial cycle - point. + Set the start cycle point, which may be after the initial + cycle point. If the specified start point is not in the sequence, the next on-sequence point will be used. @@ -350,8 +350,54 @@ def run_task_query(conn, workflow): # TODO: support all arguments including states # https://github.com/cylc/cylc-uiserver/issues/440 tasks = [] + total_of_totals = 0 for row in conn.execute(''' -SELECT +WITH profiler_stats AS ( + SELECT + tj.name, + tj.cycle, + tj.submit_num, + tj.submit_status, + tj.time_run, + tj.time_run_exit, + tj.job_id, + tj.platform_name, + tj.time_submit, + tj.run_status, + CASE + WHEN te.event = 'message debug' THEN COALESCE(CAST(SUBSTR(te.message, + INSTR(te.message, 'mem_alloc ') + 10) AS INT), 0) + ELSE 0 + END AS mem_alloc, + CASE + WHEN te.event = 'message debug' THEN COALESCE( + CAST(SUBSTR(te.message, INSTR(te.message, 'max_rss ') + 8, + INSTR(te.message, ' mem_alloc') - + (INSTR(te.message, 'max_rss ') + 8)) AS INT), 0) + ELSE 0 + END AS max_rss, + CASE + WHEN te.event = 'message debug' THEN COALESCE( + CAST(SUBSTR(te.message, INSTR(te.message, 'cpu_time ') + 9, + INSTR(te.message, ' max_rss') - + (INSTR(te.message, 'cpu_time ') + 8)) AS INT), 0) + ELSE 0 + END AS cpu_time, + STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_submit) AS total_time, + STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_run) AS run_time, + STRFTIME('%s', time_run) - STRFTIME('%s', time_submit) AS queue_time + FROM + task_jobs tj + LEFT JOIN + task_events te + ON + tj.name = te.name + AND tj.cycle = te.cycle + AND tj.submit_num = te.submit_num + GROUP BY tj.name, tj.cycle, tj.submit_num, tj.platform_name +), +time_stats AS ( + SELECT name, cycle, submit_num, @@ -362,67 +408,102 @@ def run_task_query(conn, workflow): job_id, platform_name, time_submit, - - -- Calculate Queue time stats - MIN(queue_time) AS min_queue_time, - AVG(queue_time) AS mean_queue_time, - MAX(queue_time) AS max_queue_time, - AVG(queue_time * queue_time) AS mean_squares_queue_time, - MAX(CASE WHEN queue_time_quartile = 1 THEN queue_time END) - queue_quartile_1, - MAX(CASE WHEN queue_time_quartile = 2 THEN queue_time END) - queue_quartile_2, - MAX(CASE WHEN queue_time_quartile = 3 THEN queue_time END) - queue_quartile_3, - - -- Calculate Run time stats - MIN(run_time) AS min_run_time, - AVG(run_time) AS mean_run_time, - MAX(run_time) AS max_run_time, - AVG(run_time * run_time) AS mean_squares_run_time, - MAX(CASE WHEN run_time_quartile = 1 THEN run_time END) run_quartile_1, - MAX(CASE WHEN run_time_quartile = 2 THEN run_time END) run_quartile_2, - MAX(CASE WHEN run_time_quartile = 3 THEN run_time END) run_quartile_3, - - -- Calculate Total time stats - MIN(total_time) AS min_total_time, - AVG(total_time) AS mean_total_time, - MAX(total_time) AS max_total_time, - AVG(total_time * total_time) AS mean_squares_total_time, - MAX(CASE WHEN total_time_quartile = 1 THEN total_time END) - total_quartile_1, - MAX(CASE WHEN total_time_quartile = 2 THEN total_time END) - total_quartile_2, - MAX(CASE WHEN total_time_quartile = 3 THEN total_time END) - total_quartile_3, - - COUNT(*) AS n - -FROM - (SELECT - *, - NTILE (4) OVER (PARTITION BY name ORDER BY queue_time) - queue_time_quartile, - NTILE (4) OVER (PARTITION BY name ORDER BY run_time) - run_time_quartile, - NTILE (4) OVER (PARTITION BY name ORDER BY total_time) - total_time_quartile - FROM - (SELECT - *, - STRFTIME('%s', time_run_exit) - - STRFTIME('%s', time_submit) AS total_time, - STRFTIME('%s', time_run_exit) - - STRFTIME('%s', time_run) AS run_time, - STRFTIME('%s', time_run) - - STRFTIME('%s', time_submit) AS queue_time - FROM - task_jobs - WHERE - run_status = 0)) -GROUP BY - name, platform_name; + queue_time, + run_time, + total_time, + mem_alloc, + NTILE(4) OVER (PARTITION BY name ORDER BY queue_time) + AS queue_time_quartile, + NTILE(4) OVER (PARTITION BY name ORDER BY run_time) + AS run_time_quartile, + NTILE(4) OVER (PARTITION BY name ORDER BY total_time) + AS total_time_quartile, + NTILE(4) OVER (PARTITION BY name ORDER BY CAST + (TRIM(REPLACE(max_rss, 'max_rss ', '')) AS INT)) AS max_rss_quartile, + CAST(TRIM(REPLACE(max_rss, 'max_rss ', '')) AS INT) AS max_rss, + NTILE(4) OVER (PARTITION BY name ORDER BY CAST + (TRIM(REPLACE(cpu_time, 'cpu_time ', '')) AS INT)) AS cpu_time_quartile, + CAST(TRIM(REPLACE(cpu_time, 'cpu_time ', '')) AS INT) AS cpu_time + FROM profiler_stats +) +SELECT + name, + cycle, + submit_num, + submit_status, + run_status, + time_run, + time_run_exit, + job_id, + platform_name, + time_submit, + mem_alloc, + + -- Calculate Queue time stats + MIN(queue_time) AS min_queue_time, + CAST(AVG(queue_time) AS INT) AS mean_queue_time, + MAX(queue_time) AS max_queue_time, + SQRT(AVG(queue_time * queue_time) - AVG(queue_time) * AVG(queue_time)) + AS stddev_queue_time, + MAX(CASE WHEN queue_time_quartile = 1 THEN queue_time END) + AS queue_quartile_1, + MAX(CASE WHEN queue_time_quartile = 2 THEN queue_time END) + AS queue_quartile_2, + MAX(CASE WHEN queue_time_quartile = 3 THEN queue_time END) + AS queue_quartile_3, + + -- Calculate Run time stats + MIN(run_time) AS min_run_time, + CAST(AVG(run_time) AS INT) AS mean_run_time, + MAX(run_time) AS max_run_time, + SQRT(AVG(run_time * run_time) - AVG(run_time) * AVG(run_time)) + AS stddev_run_time, + MAX(CASE WHEN run_time_quartile = 1 THEN run_time END) AS run_quartile_1, + MAX(CASE WHEN run_time_quartile = 2 THEN run_time END) AS run_quartile_2, + MAX(CASE WHEN run_time_quartile = 3 THEN run_time END) AS run_quartile_3, + + -- Calculate Total time stats + MIN(total_time) AS min_total_time, + CAST(AVG(total_time) AS INT) AS mean_total_time, + MAX(total_time) AS max_total_time, + SQRT(AVG(total_time * total_time) - AVG(total_time) * AVG(total_time)) + AS stddev_total_time, + MAX(CASE WHEN total_time_quartile = 1 THEN total_time END) + AS total_quartile_1, + MAX(CASE WHEN total_time_quartile = 2 THEN total_time END) + AS total_quartile_2, + MAX(CASE WHEN total_time_quartile = 3 THEN total_time END) + AS total_quartile_3, + + -- Calculate RSS stats + MIN(max_rss) AS min_max_rss, + CAST(AVG(max_rss) AS INT) AS mean_max_rss, + MAX(max_rss) AS max_max_rss, + SQRT(AVG(max_rss * max_rss) - AVG(max_rss) * AVG(max_rss)) + AS stddev_max_rss, + MAX(CASE WHEN max_rss_quartile = 1 THEN max_rss END) AS max_rss_quartile_1, + MAX(CASE WHEN max_rss_quartile = 2 THEN max_rss END) AS max_rss_quartile_2, + MAX(CASE WHEN max_rss_quartile = 3 THEN max_rss END) AS max_rss_quartile_3, + + -- Calculate CPU time + MIN(cpu_time) AS min_cpu_time, + CAST(AVG(cpu_time) AS INT) AS mean_cpu_time, + MAX(cpu_time) AS max_cpu_time, + CAST(TOTAL(cpu_time) AS INT) AS total_cpu_time, + SQRT(AVG(cpu_time * cpu_time) - AVG(cpu_time) * AVG(cpu_time)) + AS stddev_cpu_time, + MAX(CASE WHEN cpu_time_quartile = 1 THEN cpu_time END) + AS cpu_time_quartile_1, + MAX(CASE WHEN cpu_time_quartile = 2 THEN cpu_time END) + AS cpu_time_quartile_2, + MAX(CASE WHEN cpu_time_quartile = 3 THEN cpu_time END) + AS cpu_time_quartile_3, + + COUNT(*) AS n +FROM time_stats +GROUP BY name, platform_name; '''): + total_of_totals += row[40] tasks.append({ 'id': workflow.duplicate( cycle=row[1], @@ -438,37 +519,58 @@ def run_task_query(conn, workflow): 'job_id': row[7], 'platform': row[8], 'submitted_time': row[9], + 'mem_alloc': row[10], # Queue time stats - 'min_queue_time': row[10], - 'mean_queue_time': row[11], - 'max_queue_time': row[12], - 'std_dev_queue_time': (row[13] - row[11]**2)**0.5, + 'min_queue_time': row[11], + 'mean_queue_time': row[12], + 'max_queue_time': row[13], + 'std_dev_queue_time': row[14], # Prevents null entries when there are too few tasks for quartiles - 'queue_quartiles': [row[14], - row[14] if row[15] is None else row[15], - row[14] if row[16] is None else row[16]], + 'queue_quartiles': [row[15], + row[15] if row[16] is None else row[16], + row[15] if row[17] is None else row[17]], # Run time stats - 'min_run_time': row[17], - 'mean_run_time': row[18], - 'max_run_time': row[19], - 'std_dev_run_time': (row[20] - row[18]**2)**0.5, + 'min_run_time': row[18], + 'mean_run_time': row[19], + 'max_run_time': row[20], + 'std_dev_run_time': row[21], # Prevents null entries when there are too few tasks for quartiles - 'run_quartiles': [row[21], - row[21] if row[22] is None else row[22], - row[21] if row[23] is None else row[23]], + 'run_quartiles': [row[22], + row[22] if row[23] is None else row[23], + row[22] if row[24] is None else row[24]], # Total - 'min_total_time': row[24], - 'mean_total_time': row[25], - 'max_total_time': row[26], - 'std_dev_total_time': (row[27] - row[25] ** 2) ** 0.5, + 'min_total_time': row[25], + 'mean_total_time': row[26], + 'max_total_time': row[27], + 'std_dev_total_time': row[28], # Prevents null entries when there are too few tasks for quartiles - 'total_quartiles': [row[28], - row[28] if row[29] is None else row[29], - row[28] if row[30] is None else row[30]], - - 'count': row[31] + 'total_quartiles': [row[29], + row[29] if row[30] is None else row[30], + row[29] if row[31] is None else row[31]], + # Max RSS stats + 'min_max_rss': row[32], + 'mean_max_rss': row[33], + 'max_max_rss': row[34], + 'std_dev_max_rss': row[35], + # Prevents null entries when there are too few tasks for quartiles + 'max_rss_quartiles': [row[36], + row[36] if row[37] is None else row[37], + row[36] if row[38] is None else row[38]], + # CPU time stats + 'min_cpu_time': row[39], + 'mean_cpu_time': row[40], + 'max_cpu_time': row[41], + 'total_cpu_time': row[42], + 'std_dev_cpu_time': row[43], + # Prevents null entries when there are too few tasks for quartiles + 'cpu_time_quartiles': [row[44], + row[44] if row[45] is None else row[45], + row[44] if row[46] is None else row[46]], + 'count': row[47] }) + for task in tasks: + task['total_of_totals'] = total_of_totals return tasks @@ -598,9 +700,9 @@ def run_jobs_query( for id_ in ids: item = [] for token, column in ( - ('cycle', 'cycle'), - ('task', 'name'), - ('job', 'submit_num'), + ('cycle', 'data.cycle'), + ('task', 'data.name'), + ('job', 'data.submit_num'), ): value = id_[token] if value: @@ -623,9 +725,9 @@ def run_jobs_query( for id_ in exids: items = [] for token, column in ( - ('cycle', 'cycle'), - ('task', 'name'), - ('job', 'submit_num'), + ('cycle', 'data.cycle'), + ('task', 'data.name'), + ('job', 'data.submit_num'), ): value = id_[token] if value: @@ -644,19 +746,19 @@ def run_jobs_query( if submit_status is None: # hasn't yet submitted (i.e. there is no job) continue - item = [r'IFNULL(submit_status,999) = ?'] + item = [r'IFNULL(data.submit_status,999) = ?'] where_args.append(submit_status) if run_status is None: item.append('run_status IS NULL') else: - item.append(r'IFNULL(run_status,999) = ?') + item.append(r'IFNULL(data.run_status,999) = ?') where_args.append(run_status) if time_run is None: - item.append(r'time_run IS NULL') + item.append(r'data.time_run IS NULL') else: - item.append(r'time_run NOT NULL') + item.append(r'data.time_run NOT NULL') items.append(r'(' + ' AND '.join(item) + r')') @@ -670,60 +772,92 @@ def run_jobs_query( if submit_status is None: # hasn't yet submitted (i.e. there is no job) continue - item = [r'IFNULL(submit_status,999) = ?'] + item = [r'IFNULL(data.submit_status,999) = ?'] where_args.append(submit_status) if run_status is None: - item.append(r'run_status IS NULL') + item.append(r'data.run_status IS NULL') else: - item.append(r'IFNULL(run_status,999) = ?') + item.append(r'IFNULL(data.run_status,999) = ?') where_args.append(run_status) if time_run is None: - item.append(r'time_run IS NULL') + item.append(r'data.time_run IS NULL') else: - item.append(r'time_run NOT NULL') + item.append(r'data.time_run NOT NULL') where_stmts.append(r'NOT (' + ' AND '.join(item) + r')') # filter by task name (special UIS argument for namespace queries) if tasks: where_stmts.append( - r'(name = ' - + r" OR name = ".join('?' for task in tasks) + r'(data.name = ' + + r" OR data.name = ".join('?' for task in tasks) + r')' ) where_args.extend(tasks) # build the SQL query - submit_num = 'max(submit_num)' if jobNN else 'submit_num' + submit_num = 'max(data.submit_num)' if jobNN else 'data.submit_num' query = rf''' +WITH data AS ( + SELECT + tj.*, + CAST( + SUBSTR( + te.message, + INSTR(te.message, 'mem_alloc ') + 10 + ) AS INT + ) AS mem_alloc, + CAST( + SUBSTR( + te.message, + INSTR(te.message, 'max_rss ') + 9, + INSTR(te.message, ' mem_alloc') - + (INSTR(te.message, 'max_rss ') + 9) + ) AS INT + ) AS max_rss, + CAST( + SUBSTR( + te.message, + INSTR(te.message, 'cpu_time ') + 9, + INSTR(te.message, ' max_rss') - + (INSTR(te.message, 'cpu_time ') + 9) + ) AS INT + ) AS cpu_time + FROM + task_jobs tj + LEFT JOIN + task_events te ON tj.name = te.name AND tj.cycle = te.cycle AND + tj.submit_num = te.submit_num AND te.message LIKE '%cpu_time%' +) SELECT - name, - cycle AS cycle_point, + data.name, + data.cycle AS cycle_point, {submit_num} AS submit_num, - submit_status, - time_run AS started_time, - time_run_exit AS finished_time, - job_id, + data.submit_status, + data.time_run AS started_time, + data.time_run_exit AS finished_time, + data.job_id, job_runner_name, - platform_name AS platform, - time_submit AS submitted_time, - STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_submit) - AS total_time, - STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_run) - AS run_time, - STRFTIME('%s', time_run) - STRFTIME('%s', time_submit) - AS queue_time, - run_status - FROM - task_jobs + data.platform_name AS platform, + data.time_submit AS submitted_time, + STRFTIME('%s', data.time_run_exit) - + STRFTIME('%s', data.time_submit) AS total_time, + STRFTIME('%s', data.time_run_exit) - + STRFTIME('%s', data.time_run) AS run_time, + STRFTIME('%s', data.time_run) - + STRFTIME('%s', data.time_submit) AS queue_time, + data.run_status, + data.max_rss, + data.cpu_time, + data.mem_alloc + FROM data ''' if where_stmts: - query += 'WHERE ' + ' AND '.join(where_stmts) + query += 'WHERE ' + ' AND '.join(where_stmts) if jobNN: - query += ' GROUP BY name, cycle' - + query += ' GROUP BY data.name, data.cycle' for row in conn.execute(query, where_args): row = dict(row) # determine job status @@ -753,33 +887,56 @@ def run_jobs_query( class UISTask(Task): platform = graphene.String() + min_total_time = graphene.Int() mean_total_time = graphene.Int() max_total_time = graphene.Int() - std_dev_total_time = graphene.Int() - queue_quartiles = graphene.List( + std_dev_total_time = graphene.Float() + total_quartiles = graphene.List( graphene.Int, description=sstrip(''' - List containing the first, second, - third and forth quartile queue times.''')) + List containing the first, second, + third and forth quartile total times.''')) min_queue_time = graphene.Int() mean_queue_time = graphene.Int() max_queue_time = graphene.Int() - std_dev_queue_time = graphene.Int() - run_quartiles = graphene.List( + std_dev_queue_time = graphene.Float() + queue_quartiles = graphene.List( graphene.Int, description=sstrip(''' List containing the first, second, - third and forth quartile run times.''')) + third and forth quartile queue times.''')) min_run_time = graphene.Int() mean_run_time = graphene.Int() max_run_time = graphene.Int() - std_dev_run_time = graphene.Int() - total_quartiles = graphene.List( + std_dev_run_time = graphene.Float() + run_quartiles = graphene.List( graphene.Int, description=sstrip(''' - List containing the first, second, - third and forth quartile total times.''')) + List containing the first, second, + third and forth quartile run times.''')) + max_rss = graphene.Int() + min_max_rss = graphene.Int() + mean_max_rss = graphene.Int() + max_max_rss = graphene.Int() + std_dev_max_rss = graphene.Int() + max_rss_quartiles = graphene.List( + graphene.Int, + description=sstrip(''' + List containing the first, second, + third and forth quartile for Max RSS.''')) + min_cpu_time = graphene.Int() + mean_cpu_time = graphene.Int() + max_cpu_time = graphene.Int() + total_cpu_time = graphene.Int() + std_dev_cpu_time = graphene.Float() + cpu_time_quartiles = graphene.List( + graphene.Int, + description=sstrip(''' + List containing the first, second, + third and forth quartile for CPU time.''')) + total_of_totals = graphene.Int() + mem_alloc = graphene.Int() count = graphene.Int() @@ -788,6 +945,8 @@ class UISJob(Job): total_time = graphene.Int() queue_time = graphene.Int() run_time = graphene.Int() + max_rss = graphene.Int() + cpu_time = graphene.Int() class UISQueries(Queries): diff --git a/cylc/uiserver/tests/test_workflow_retrieval.py b/cylc/uiserver/tests/test_workflow_retrieval.py index 9fc957df..7cd7a036 100644 --- a/cylc/uiserver/tests/test_workflow_retrieval.py +++ b/cylc/uiserver/tests/test_workflow_retrieval.py @@ -33,7 +33,7 @@ ) -def make_db(*task_entries): +def make_db(task_entries, task_events=None): """Create a DB and populate the task_jobs table.""" conn = sqlite3.connect(':memory:') conn.row_factory = sqlite3.Row @@ -62,21 +62,37 @@ def make_db(*task_entries): ''' ) + conn.execute( + ''' + CREATE TABLE + task_events( + cycle TEXT, + name TEXT, + submit_num INTEGER, + time TEXT, + event TEXT, + message TEXT);''') + conn.executemany( 'INSERT into task_jobs VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', - task_entries, + task_entries ) + if task_events: + conn.executemany( + 'INSERT into task_events VALUES (?,?,?,?,?,?)', + task_events + ) conn.commit() return conn def test_make_task_query_1(): conn = make_db( - ( + task_entries=[( '1', 'Task_1', - '01', - '{1}', + 1, + '[1]', 0, 1, '2022-12-14T15:00:00Z', @@ -89,7 +105,41 @@ def test_make_task_query_1(): 'MyPlatform', 'User', 'UsersJob', - ) + )], + task_events=[ + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:00:00Z', + 'submitted', + '' + ), + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:01:00Z', + 'started', + '' + ), + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:10:00Z', + 'succeeded', + '' + ), + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:10:00Z', + 'message debug', + 'cpu_time 994 max_rss 40064 mem_alloc 1048576' + ) + ] ) workflow = Tokens('~user/workflow') @@ -101,17 +151,25 @@ def test_make_task_query_1(): assert ret['queue_quartiles'] == [60, 60, 60] assert ret['run_quartiles'] == [540, 540, 540] assert ret['total_quartiles'] == [600, 600, 600] + assert ret['max_rss_quartiles'] == [40064, 40064, 40064] + assert ret['cpu_time_quartiles'] == [994, 994, 994] assert ret['id'].id == '~user/workflow//1/Task_1/01' assert ret['job_id'] == 'UsersJob' assert ret['max_queue_time'] == 60 assert ret['max_run_time'] == 540 assert ret['max_total_time'] == 600 + assert ret['max_max_rss'] == 40064 + assert ret['max_cpu_time'] == 994 assert ret['mean_queue_time'] == pytest.approx(60.0, 0.01) assert ret['mean_run_time'] == pytest.approx(540.0, 0.01) assert ret['mean_total_time'] == pytest.approx(600.0, 0.01) + assert ret['mean_max_rss'] == pytest.approx(40064.0, 0.01) + assert ret['mean_cpu_time'] == pytest.approx(994.0, 0.01) assert ret['min_queue_time'] == 60 assert ret['min_run_time'] == 540 assert ret['min_total_time'] == 600 + assert ret['min_max_rss'] == 40064 + assert ret['min_cpu_time'] == 994 assert ret['name'] == 'Task_1' assert ret['platform'] == 'MyPlatform' assert ret['started_time'] == '2022-12-14T15:01:00Z' @@ -119,48 +177,119 @@ def test_make_task_query_1(): assert ret['std_dev_queue_time'] == pytest.approx(0.0, 0.01) assert ret['std_dev_run_time'] == pytest.approx(0.0, 0.01) assert ret['std_dev_total_time'] == pytest.approx(0.0, 0.01) + assert ret['std_dev_max_rss'] == pytest.approx(0.0, 0.01) + assert ret['std_dev_cpu_time'] == pytest.approx(0.0, 0.01) assert ret['submit_num'] == 1 assert ret['submitted_time'] == '2022-12-14T15:00:00Z' + assert ret['mem_alloc'] == 1048576 def test_make_task_query_2(): conn = make_db( - ( - '1', - 'Task_1', - '01', - '{1}', - 0, - 1, - '2022-12-14T15:00:00Z', - '2022-12-14T15:01:00Z', - 0, - '2022-12-14T15:01:00Z', - '2022-12-14T15:10:00Z', - None, - 0, - 'MyPlatform', - 'User', - 'UsersJob', - ), - ( - '2', - 'Task_1', - '01', - '{1}', - 0, - 1, - '2022-12-15T15:00:00Z', - '2022-12-15T15:01:15Z', - 0, - '2022-12-15T15:01:16Z', - '2022-12-15T15:12:00Z', - None, - 0, - 'MyPlatform', - 'User', - 'UsersJob', - ), + task_entries=[ + ( + '1', + 'Task_1', + 1, + '[1]', + 0, + 1, + '2022-12-14T15:00:00Z', + '2022-12-14T15:01:00Z', + 0, + '2022-12-14T15:01:00Z', + '2022-12-14T15:10:00Z', + None, + 0, + 'MyPlatform', + 'User', + 'UsersJob', + ), + ( + '2', + 'Task_1', + 1, + '[1]', + 0, + 1, + '2022-12-15T15:00:00Z', + '2022-12-15T15:01:15Z', + 0, + '2022-12-15T15:01:16Z', + '2022-12-15T15:12:00Z', + None, + 0, + 'MyPlatform', + 'User', + 'UsersJob', + ) + ], + task_events=[ + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:00:00Z', + 'submitted', + '' + ), + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:01:00Z', + 'started', + '' + ), + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:10:00Z', + 'succeeded', + '' + ), + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:10:00Z', + 'message debug', + 'cpu_time 994 max_rss 40064' + ), + ( + '2', + 'Task_1', + 1, + '2022-12-14T16:00:00Z', + 'submitted', + '' + ), + ( + '2', + 'Task_1', + 1, + '2022-12-14T16:01:00Z', + 'started', + '' + ), + ( + '2', + 'Task_1', + 1, + '2022-12-14T16:10:00Z', + 'succeeded', + '' + ), + ( + '2', + 'Task_1', + 1, + '2022-12-14T16:10:00Z', + 'message debug', + 'cpu_time 1994 max_rss 50064' + ), + ] ) conn.commit() workflow = Tokens('~user/workflow') @@ -197,68 +326,166 @@ def test_make_task_query_2(): def test_make_task_query_3(): conn = make_db( - ( - '1', - 'Task_1', - '01', - '{1}', - 0, - 1, - '2022-12-14T15:00:00Z', - '2022-12-14T15:01:00Z', - 0, - '2022-12-14T15:01:00Z', - '2022-12-14T15:10:00Z', - None, - 0, - 'MyPlatform', - 'User', - 'UsersJob', - ), - ( - '2', - 'Task_1', - '01', - '{1}', - 0, - 1, - '2022-12-15T15:00:00Z', - '2022-12-15T15:01:15Z', - 0, - '2022-12-15T15:01:16Z', - '2022-12-15T15:12:00Z', - None, - 0, - 'MyPlatform', - 'User', - 'UsersJob', - ), - ( - '3', - 'Task_1', - '01', - '{1}', - 0, - 1, - '2022-12-16T15:00:00Z', - '2022-12-16T15:01:15Z', - 0, - '2022-12-16T15:01:16Z', - '2022-12-16T15:12:00Z', - None, - 0, - 'MyPlatform', - 'User', - 'UsersJob', - ), + task_entries=[ + ( + '1', + 'Task_1', + 1, + '[1]', + 0, + 1, + '2022-12-14T15:00:00Z', + '2022-12-14T15:01:00Z', + 0, + '2022-12-14T15:01:00Z', + '2022-12-14T15:10:00Z', + None, + 0, + 'MyPlatform', + 'User', + 'UsersJob', + ), + ( + '2', + 'Task_1', + 1, + '[1]', + 0, + 1, + '2022-12-15T15:00:00Z', + '2022-12-15T15:01:15Z', + 0, + '2022-12-15T15:01:16Z', + '2022-12-15T15:12:00Z', + None, + 0, + 'MyPlatform', + 'User', + 'UsersJob', + ), + ( + '3', + 'Task_1', + 1, + '[1]', + 0, + 1, + '2022-12-16T15:00:00Z', + '2022-12-16T15:01:15Z', + 0, + '2022-12-16T15:01:16Z', + '2022-12-16T15:12:00Z', + None, + 0, + 'MyPlatform', + 'User', + 'UsersJob', + ) + ], + task_events=[ + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:00:00Z', + 'submitted', + '' + ), + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:01:00Z', + 'started', + '' + ), + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:10:00Z', + 'succeeded', + '' + ), + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:10:00Z', + 'message debug', + 'cpu_time 994 max_rss 40064' + ), + ( + '2', + 'Task_1', + 1, + '2022-12-14T16:00:00Z', + 'submitted', + '' + ), + ( + '2', + 'Task_1', + 1, + '2022-12-14T16:01:00Z', + 'started', + '' + ), + ( + '2', + 'Task_1', + 1, + '2022-12-14T16:10:00Z', + 'succeeded', + '' + ), + ( + '2', + 'Task_1', + 1, + '2022-12-14T16:10:00Z', + 'message debug', + 'cpu_time 1994 max_rss 50064' + ), + ( + '3', + 'Task_1', + 1, + '2022-12-14T17:00:00Z', + 'submitted', + '' + ), + ( + '3', + 'Task_1', + 1, + '2022-12-14T17:01:00Z', + 'started', + '' + ), + ( + '3', + 'Task_1', + 1, + '2022-12-14T17:10:00Z', + 'succeeded', + '' + ), + ( + '3', + 'Task_1', + 1, + '2022-12-14T17:10:00Z', + 'message debug', + 'cpu_time 2994 max_rss 60064' + ), + ] ) conn.commit() workflow = Tokens('~user/workflow') - ret = run_task_query(conn, workflow) + ret = run_task_query(conn, workflow)[0] - assert len(ret) == 1 - ret = ret[0] assert ret['count'] == 3 assert ret['cycle_point'] == '3' assert ret['finished_time'] == '2022-12-16T15:12:00Z' @@ -293,28 +520,18 @@ def test_make_task_query_different_platforms(): https://github.com/cylc/cylc-uiserver/issues/696 """ - conn = sqlite3.connect(':memory:') - conn.execute('''CREATE TABLE task_jobs(cycle TEXT, name TEXT, - submit_num INTEGER, flow_nums TEXT, is_manual_submit INTEGER, - try_num INTEGER, time_submit TEXT, time_submit_exit TEXT, - submit_status INTEGER, time_run TEXT, time_run_exit TEXT, - run_signal TEXT, run_status INTEGER, platform_name TEXT, - job_runner_name TEXT, job_id TEXT, - PRIMARY KEY(cycle, name, submit_num));''') middle_values = ( 'Task_1', '01', '[1]', 0, 1, '2022-12-14T15:00:00Z', '2022-12-14T15:01:00Z', 0, '2022-12-14T15:01:00Z', '2022-12-14T15:10:00Z', None, 0, ) - conn.executemany( - 'INSERT into task_jobs VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', - [ - ('1', *middle_values, 'MyPlatform', 'User', 'UsersJob'), - ('2', *middle_values, 'MyPlatform2', 'User', 'UsersJob'), - ('3', *middle_values, 'MyPlatform3', 'User', 'UsersJob'), - ], - ) + conn = make_db(task_entries=[ + ('1', *middle_values, 'MyPlatform', 'User', 'UsersJob'), + ('2', *middle_values, 'MyPlatform2', 'User', 'UsersJob'), + ('3', *middle_values, 'MyPlatform3', 'User', 'UsersJob'), + ]) + conn.commit() workflow = Tokens('~user/workflow') @@ -327,60 +544,160 @@ def test_make_task_query_different_platforms(): def test_make_jobs_query_1(): conn = make_db( - ( - '1', - 'Task_1', - '01', - '{1}', - 0, - 1, - '2022-12-14T15:00:00Z', - '2022-12-14T15:01:00Z', - 0, - '2022-12-14T15:01:00Z', - '2022-12-14T15:10:00Z', - None, - 0, - 'MyPlatform', - 'User', - 'UsersJob', - ), - ( - '2', - 'Task_1', - '01', - '{1}', - 0, - 1, - '2022-12-15T15:00:00Z', - '2022-12-15T15:01:15Z', - 0, - '2022-12-15T15:01:16Z', - '2022-12-15T15:12:00Z', - None, - 0, - 'MyPlatform', - 'User', - 'UsersJob', - ), - ( - '3', - 'Task_1', - '01', - '{1}', - 0, - 1, - '2022-12-16T15:00:00Z', - '2022-12-16T15:01:15Z', - 0, - '2022-12-16T15:01:16Z', - '2022-12-16T15:12:00Z', - None, - 0, - 'MyPlatform', - 'User', - 'UsersJob', - ), + task_entries=[ + ( + '1', + 'Task_1', + 1, + '[1]', + 0, + 1, + '2022-12-14T15:00:00Z', + '2022-12-14T15:01:00Z', + 0, + '2022-12-14T15:01:00Z', + '2022-12-14T15:10:00Z', + None, + 0, + 'MyPlatform', + 'User', + 'UsersJob', + ), + ( + '2', + 'Task_1', + 1, + '[1]', + 0, + 1, + '2022-12-15T15:00:00Z', + '2022-12-15T15:01:15Z', + 0, + '2022-12-15T15:01:16Z', + '2022-12-15T15:12:00Z', + None, + 0, + 'MyPlatform', + 'User', + 'UsersJob', + ), + ( + '3', + 'Task_1', + 1, + '[1]', + 0, + 1, + '2022-12-16T15:00:00Z', + '2022-12-16T15:01:15Z', + 0, + '2022-12-16T15:01:16Z', + '2022-12-16T15:12:00Z', + None, + 0, + 'MyPlatform', + 'User', + 'UsersJob', + ) + ], + task_events=[ + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:00:00Z', + 'submitted', + '' + ), + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:01:00Z', + 'started', + '' + ), + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:10:00Z', + 'succeeded', + '' + ), + ( + '1', + 'Task_1', + 1, + '2022-12-14T15:10:00Z', + 'message debug', + '{ "max_rss": 40064, "cpu_time": 994}' + ), + ( + '2', + 'Task_1', + 1, + '2022-12-14T16:00:00Z', + 'submitted', + '' + ), + ( + '2', + 'Task_1', + 1, + '2022-12-14T16:01:00Z', + 'started', + '' + ), + ( + '2', + 'Task_1', + 1, + '2022-12-14T16:10:00Z', + 'succeeded', + '' + ), + ( + '2', + 'Task_1', + 1, + '2022-12-14T16:10:00Z', + 'message debug', + '{ "max_rss": 50064, "cpu_time": 1994}' + ), + ( + '3', + 'Task_1', + 1, + '2022-12-14T17:00:00Z', + 'submitted', + '' + ), + ( + '3', + 'Task_1', + 1, + '2022-12-14T17:01:00Z', + 'started', + '' + ), + ( + '3', + 'Task_1', + 1, + '2022-12-14T17:10:00Z', + 'succeeded', + '' + ), + ( + '3', + 'Task_1', + 1, + '2022-12-14T17:10:00Z', + 'message_debug', + '{ "max_rss": 60064, "cpu_time": 2994}' + ), + ] ) conn.commit() workflow = Tokens('~user/workflow') @@ -389,7 +706,6 @@ def test_make_jobs_query_1(): ret = run_jobs_query(conn, workflow, tasks) assert len(ret) == 3 - assert ret[0]['cycle_point'] == '1' assert ret[0]['finished_time'] == '2022-12-14T15:10:00Z' assert ret[0]['id'].id == '~user/workflow//1/Task_1/01' @@ -414,7 +730,6 @@ def test_make_jobs_query_1(): async def test_list_elements(monkeypatch): - with pytest.raises(Exception) as e_info: await list_elements('tasks', stuff=[1, 2, 3], workflows=[]) @@ -540,10 +855,10 @@ def make_job(task_name, submit_status, run_status, started): return ( '1', task_name, - '01', - '{1}', - 0, 1, + '[1]', + 0, + 0, submit_start, submit_end, submit_status, @@ -558,12 +873,13 @@ def make_job(task_name, submit_status, run_status, started): # define a workflow with one job in each state conn = make_db( - make_job('waiting', None, None, None), - make_job('submitted', 0, None, False), - make_job('submit-failed', 1, None, False), - make_job('running', 0, None, True), - make_job('succeeded', 0, 0, True), - make_job('failed', 0, 1, True), + task_entries=[ + make_job('waiting', None, None, None), + make_job('submitted', 0, None, False), + make_job('submit-failed', 1, None, False), + make_job('running', 0, None, True), + make_job('succeeded', 0, 0, True), + make_job('failed', 0, 1, True)], ) workflow = Tokens('~user/workflow') @@ -716,10 +1032,7 @@ def make_job(cycle: Union[str, int], name: str, submit_num: int): 'background', '1701', ) - - conn = make_db( - *(make_job(*job) for job in jobs) - ) + conn = make_db(task_entries=(make_job(*job) for job in jobs)) workflow = Tokens('~user/workflow') result = run_jobs_query( conn, workflow, ids=[Tokens(i, relative=True) for i in query] @@ -745,24 +1058,25 @@ async def test_e2e_jobs_query(monkeypatch: pytest.MonkeyPatch): 'queueTime': 2, 'runTime': 98, } - conn = make_db(( - entry['cyclePoint'], - entry['name'], - entry['submitNum'], - '[1]', - 0, - 1, - entry['submittedTime'], - '2022-12-14T15:00:01Z', - 0, - entry['startedTime'], - entry['finishedTime'], - None, - 0, - entry['platform'], - entry['jobRunnerName'], - entry['jobId'], - )) + conn = make_db( + task_entries=[( + entry['cyclePoint'], + entry['name'], + entry['submitNum'], + '[1]', + 0, + 1, + entry['submittedTime'], + '2022-12-14T15:00:01Z', + 0, + entry['startedTime'], + entry['finishedTime'], + None, + 0, + entry['platform'], + entry['jobRunnerName'], + entry['jobId'], + )]) mock_dao = Mock( return_value=Mock( __enter__=Mock(return_value=Mock(connect=lambda: conn)),