Skip to content

Commit 731b65f

Browse files
committed
perf(meta_process): parallelize all operations
Replace granular storage sub-phase timing (write_queries, store, sparql_upload, bulk_load) with a single wall-clock storage phase. Parallelize RDF file storage, query generation, and SPARQL uploads using fork processes. Remove batch_size parameter from upload function and add show_progress option.
1 parent a068aa1 commit 731b65f

File tree

6 files changed

+230
-429
lines changed

6 files changed

+230
-429
lines changed

oc_meta/run/benchmark/plotting.py

Lines changed: 20 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,6 @@
1919
RDF_CREATION_COLOR = '#C73E1D'
2020
STORAGE_COLOR = '#6A994E'
2121

22-
STORAGE_WRITE_QUERIES_DATA_COLOR = '#3498DB'
23-
STORAGE_WRITE_QUERIES_PROV_COLOR = '#9B59B6'
24-
STORAGE_STORE_DATA_COLOR = '#E67E22'
25-
STORAGE_STORE_PROV_COLOR = '#E74C3C'
26-
STORAGE_SPARQL_COLOR = '#1ABC9C'
27-
STORAGE_BULK_COLOR = '#2ECC71'
28-
2922
CURATION_REST_PHASES = [
3023
"curation__clean_id",
3124
"curation__merge_duplicates",
@@ -34,33 +27,6 @@
3427
"curation__finalize"
3528
]
3629

37-
STORAGE_SUB_PHASES = [
38-
"storage__write_queries_data",
39-
"storage__write_queries_prov",
40-
"storage__store_data",
41-
"storage__store_prov",
42-
"storage__sparql_upload",
43-
"storage__bulk_load"
44-
]
45-
46-
STORAGE_SUB_PHASE_COLORS = {
47-
"storage__write_queries_data": STORAGE_WRITE_QUERIES_DATA_COLOR,
48-
"storage__write_queries_prov": STORAGE_WRITE_QUERIES_PROV_COLOR,
49-
"storage__store_data": STORAGE_STORE_DATA_COLOR,
50-
"storage__store_prov": STORAGE_STORE_PROV_COLOR,
51-
"storage__sparql_upload": STORAGE_SPARQL_COLOR,
52-
"storage__bulk_load": STORAGE_BULK_COLOR,
53-
}
54-
55-
STORAGE_SUB_PHASE_LABELS = {
56-
"storage__write_queries_data": "Write queries data",
57-
"storage__write_queries_prov": "Write queries prov",
58-
"storage__store_data": "Store data",
59-
"storage__store_prov": "Store prov",
60-
"storage__sparql_upload": "SPARQL upload",
61-
"storage__bulk_load": "Bulk load",
62-
}
63-
6430

6531
def get_phase_duration_by_name(run: Dict[str, Any], phase_name: str) -> float:
6632
"""Get phase duration by name instead of index."""
@@ -85,8 +51,8 @@ def get_curation_rest(run: Dict[str, Any]) -> float:
8551

8652

8753
def get_storage_total(run: Dict[str, Any]) -> float:
88-
"""Calculate total storage time by summing sub-phases."""
89-
return sum(get_phase_duration_by_name(run, p) for p in STORAGE_SUB_PHASES)
54+
"""Get total storage time (wall-clock)."""
55+
return get_phase_duration_by_name(run, "storage")
9056

9157

9258
def apply_plot_style(ax, title: str, xlabel: str = None, ylabel: str = None, grid: bool = True):
@@ -114,50 +80,27 @@ def format_bar_labels(ax, bars, values: List[float], unit: str = "s"):
11480

11581

11682
def _draw_phase_breakdown(ax, collect_ids: float, curation_rest: float, rdf_time: float,
117-
storage_phases: Dict[str, float]):
118-
"""Draw stacked bar chart for phase breakdown (shared by single/multi run plots)."""
119-
# Curation stacked bar (Collect IDs + Rest)
120-
ax.bar(0, collect_ids, color=CURATION_COLLECT_IDS_COLOR, edgecolor='black', linewidth=0.5, width=0.6)
121-
ax.bar(0, curation_rest, bottom=collect_ids, color=CURATION_REST_COLOR, edgecolor='black', linewidth=0.5, width=0.6)
83+
storage_time: float):
84+
"""Draw bar chart for phase breakdown (shared by single/multi run plots)."""
12285
curation_total = collect_ids + curation_rest
12386

124-
# RDF creation bar
87+
ax.bar(0, collect_ids, color=CURATION_COLLECT_IDS_COLOR, edgecolor='black', linewidth=0.5, width=0.6)
88+
ax.bar(0, curation_rest, bottom=collect_ids, color=CURATION_REST_COLOR, edgecolor='black', linewidth=0.5, width=0.6)
12589
ax.bar(1, rdf_time, color=RDF_CREATION_COLOR, edgecolor='black', linewidth=1.5, width=0.6)
90+
ax.bar(2, storage_time, color=STORAGE_COLOR, edgecolor='black', linewidth=1.5, width=0.6)
12691

127-
# Storage stacked bar with all sub-phases
128-
bottom = 0
129-
storage_total = 0
130-
for phase_name in STORAGE_SUB_PHASES:
131-
duration = storage_phases.get(phase_name, 0)
132-
if duration > 0:
133-
ax.bar(2, duration, bottom=bottom, color=STORAGE_SUB_PHASE_COLORS[phase_name],
134-
edgecolor='black', linewidth=0.5, width=0.6)
135-
bottom += duration
136-
storage_total += duration
137-
138-
# Value labels
13992
ax.text(0, curation_total, f'{curation_total:.1f}s', ha='center', va='bottom', fontweight='bold')
14093
ax.text(1, rdf_time, f'{rdf_time:.1f}s', ha='center', va='bottom', fontweight='bold')
141-
ax.text(2, storage_total, f'{storage_total:.1f}s', ha='center', va='bottom', fontweight='bold')
94+
ax.text(2, storage_time, f'{storage_time:.1f}s', ha='center', va='bottom', fontweight='bold')
14295

14396
ax.set_xticks([0, 1, 2])
144-
ax.set_xticklabels(['Curation', 'RDF\ncreation', 'Storage\n+upload'])
97+
ax.set_xticklabels(['Curation', 'RDF\ncreation', 'Storage'])
14598

146-
# Legend for Curation and Storage sub-phases
14799
legend_patches = [
148100
Rectangle((0, 0), 1, 1, facecolor=CURATION_COLLECT_IDS_COLOR, edgecolor='black', linewidth=0.5),
149101
Rectangle((0, 0), 1, 1, facecolor=CURATION_REST_COLOR, edgecolor='black', linewidth=0.5),
150102
]
151103
legend_labels = ['Collect IDs', 'Curation rest']
152-
153-
# Add storage sub-phases that have non-zero values
154-
for phase_name in STORAGE_SUB_PHASES:
155-
if storage_phases.get(phase_name, 0) > 0:
156-
legend_patches.append(
157-
Rectangle((0, 0), 1, 1, facecolor=STORAGE_SUB_PHASE_COLORS[phase_name], edgecolor='black', linewidth=0.5)
158-
)
159-
legend_labels.append(STORAGE_SUB_PHASE_LABELS[phase_name])
160-
161104
ax.legend(legend_patches, legend_labels, loc='upper right', fontsize=6)
162105

163106

@@ -182,11 +125,7 @@ def plot_scalability_analysis(per_size_results: List[Dict[str, Any]], output_pat
182125
)
183126
phase_data["curation"].append(curation_sum)
184127
phase_data["rdf_creation"].append(size_data["statistics"]["rdf_creation_duration_seconds"]["mean"])
185-
# Sum all storage sub-phases
186-
storage_sum = sum(
187-
size_data["statistics"][f"{p}_duration_seconds"]["mean"] for p in STORAGE_SUB_PHASES
188-
)
189-
phase_data["storage_upload"].append(storage_sum)
128+
phase_data["storage_upload"].append(size_data["statistics"].get("storage_duration_seconds", {}).get("mean", 0))
190129
else:
191130
run = size_data["runs"][0]
192131
mean_durations.append(run["metrics"]["total_duration_seconds"])
@@ -202,7 +141,7 @@ def plot_scalability_analysis(per_size_results: List[Dict[str, Any]], output_pat
202141
apply_plot_style(axes[0, 1], 'Throughput vs dataset size', 'Dataset size (records)', 'Throughput (records/sec)')
203142

204143
phase_colors = [CURATION_COLLECT_IDS_COLOR, RDF_CREATION_COLOR, STORAGE_COLOR]
205-
phase_labels = ['Curation', 'RDF creation', 'Storage + upload']
144+
phase_labels = ['Curation', 'RDF creation', 'Storage']
206145

207146
x_positions = list(range(len(sizes)))
208147
bottom = [0] * len(sizes)
@@ -263,17 +202,13 @@ def plot_benchmark_results(all_runs: List[Dict[str, Any]], stats: Dict[str, Dict
263202
apply_plot_style(axes[0, 0], 'Total duration per run', 'Run number', 'Total duration (s)')
264203
axes[0, 0].legend()
265204

266-
# Phase breakdown with stacked Curation and Storage bars
205+
# Phase breakdown
267206
collect_ids_mean = stats["curation__collect_identifiers_duration_seconds"]["mean"]
268207
curation_rest_mean = sum(stats[f"{p}_duration_seconds"]["mean"] for p in CURATION_REST_PHASES)
269208
rdf_mean = stats["rdf_creation_duration_seconds"]["mean"]
209+
storage_mean = stats.get("storage_duration_seconds", {}).get("mean", 0)
270210

271-
storage_phases = {}
272-
for phase_name in STORAGE_SUB_PHASES:
273-
stat_key = f"{phase_name}_duration_seconds"
274-
storage_phases[phase_name] = stats.get(stat_key, {}).get("mean", 0)
275-
276-
_draw_phase_breakdown(axes[0, 1], collect_ids_mean, curation_rest_mean, rdf_mean, storage_phases)
211+
_draw_phase_breakdown(axes[0, 1], collect_ids_mean, curation_rest_mean, rdf_mean, storage_mean)
277212
apply_plot_style(axes[0, 1], 'Average phase duration breakdown', ylabel='Duration (s)', grid=False)
278213
axes[0, 1].grid(True, axis='y', alpha=0.3)
279214

@@ -282,7 +217,7 @@ def plot_benchmark_results(all_runs: List[Dict[str, Any]], stats: Dict[str, Dict
282217
rdf_times = [get_phase_duration_by_name(r, "rdf_creation") for r in all_runs]
283218
upload_times = [get_storage_total(r) for r in all_runs]
284219

285-
boxplot_phase_names = ['Curation', 'RDF\ncreation', 'Storage\n+upload']
220+
boxplot_phase_names = ['Curation', 'RDF\ncreation', 'Storage']
286221
boxplot_colors = [CURATION_COLLECT_IDS_COLOR, RDF_CREATION_COLOR, STORAGE_COLOR]
287222

288223
bp = axes[1, 0].boxplot([curation_times, rdf_times, upload_times],
@@ -328,13 +263,9 @@ def plot_single_run_results(run: Dict[str, Any], output_path: str):
328263
collect_ids = get_phase_duration_by_name(run, "curation__collect_identifiers")
329264
curation_rest = get_curation_rest(run)
330265
rdf_time = get_phase_duration_by_name(run, "rdf_creation")
266+
storage_time = get_phase_duration_by_name(run, "storage")
331267

332-
storage_phases = {
333-
phase_name: get_phase_duration_by_name(run, phase_name)
334-
for phase_name in STORAGE_SUB_PHASES
335-
}
336-
337-
_draw_phase_breakdown(axes[0], collect_ids, curation_rest, rdf_time, storage_phases)
268+
_draw_phase_breakdown(axes[0], collect_ids, curation_rest, rdf_time, storage_time)
338269
apply_plot_style(axes[0], 'Phase duration breakdown', ylabel='Duration (s)', grid=False)
339270
axes[0].grid(True, axis='y', alpha=0.3)
340271

@@ -375,21 +306,10 @@ def plot_incremental_progress(all_reports: List[Dict[str, Any]], output_path: st
375306
collect_ids_times = [_get_phase_duration_from_report(r["report"], "curation__collect_identifiers") for r in all_reports]
376307
curation_rest_times = [_get_curation_rest_from_report(r["report"]) for r in all_reports]
377308
rdf_times = [_get_phase_duration_from_report(r["report"], "rdf_creation") for r in all_reports]
309+
storage_times = [_get_phase_duration_from_report(r["report"], "storage") for r in all_reports]
378310
throughputs = [r["report"]["metrics"].get("throughput_records_per_sec", 0) for r in all_reports]
379311

380-
# Extract all storage sub-phase times
381-
storage_phase_times = {
382-
phase_name: [_get_phase_duration_from_report(r["report"], phase_name) for r in all_reports]
383-
for phase_name in STORAGE_SUB_PHASES
384-
}
385-
386-
# Calculate total times
387-
total_times = []
388-
for i in range(len(all_reports)):
389-
total = collect_ids_times[i] + curation_rest_times[i] + rdf_times[i]
390-
for phase_name in STORAGE_SUB_PHASES:
391-
total += storage_phase_times[phase_name][i]
392-
total_times.append(total)
312+
total_times = [c + cr + r + s for c, cr, r, s in zip(collect_ids_times, curation_rest_times, rdf_times, storage_times)]
393313

394314
mean_duration = sum(total_times) / len(total_times) if total_times else 0
395315
mean_throughput = sum(throughputs) / len(throughputs) if throughputs else 0
@@ -406,16 +326,7 @@ def plot_incremental_progress(all_reports: List[Dict[str, Any]], output_path: st
406326
bottom += np.array(curation_rest_times)
407327
ax1.bar(x, rdf_times, width, bottom=bottom, label='RDF creation', color=RDF_CREATION_COLOR, edgecolor='black', linewidth=0.3)
408328
bottom += np.array(rdf_times)
409-
410-
# Add all storage sub-phases
411-
for phase_name in STORAGE_SUB_PHASES:
412-
phase_times = storage_phase_times[phase_name]
413-
if any(t > 0 for t in phase_times):
414-
ax1.bar(x, phase_times, width, bottom=bottom,
415-
label=STORAGE_SUB_PHASE_LABELS[phase_name],
416-
color=STORAGE_SUB_PHASE_COLORS[phase_name],
417-
edgecolor='black', linewidth=0.3)
418-
bottom += np.array(phase_times)
329+
ax1.bar(x, storage_times, width, bottom=bottom, label='Storage', color=STORAGE_COLOR, edgecolor='black', linewidth=0.3)
419330

420331
ax1.axhline(y=mean_duration, color='#A23B72', linestyle='--', linewidth=2, label=f'Mean ({mean_duration:.1f}s)')
421332

0 commit comments

Comments
 (0)