diff --git a/data_validation.py b/data_validation.py index 54bb4be..220cad2 100644 --- a/data_validation.py +++ b/data_validation.py @@ -2,27 +2,27 @@ from prefect import flow, get_run_logger, task -from export_tools import initialize_tiled_client +from export_tools import get_run @task(retries=2, retry_delay_seconds=10) -def read_all_streams(uid, beamline_acronym="haxpes"): +def read_stream(run, stream): + stream_data = run[stream].read() + return stream_data + + +@flow +def data_validation(uid, api_key=None): logger = get_run_logger() - tiled_client = initialize_tiled_client(beamline_acronym) - run = tiled_client[uid] - logger.info(f"Validating uid {run.start['uid']}") + run = get_run(uid, api_key=api_key) + logger.info(f"Validating uid {uid}") start_time = time.monotonic() for stream in run: logger.info(f"{stream}:") stream_start_time = time.monotonic() - stream_data = run[stream].read() + stream_data = read_stream(run, stream) stream_elapsed_time = time.monotonic() - stream_start_time logger.info(f"{stream} elapsed_time = {stream_elapsed_time}") logger.info(f"{stream} nbytes = {stream_data.nbytes:_}") elapsed_time = time.monotonic() - start_time logger.info(f"{elapsed_time = }") - - -@flow -def general_data_validation(uid, beamline_acronym="haxpes"): - read_all_streams(uid, beamline_acronym) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index 09f59e1..32e6ff4 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -1,9 +1,19 @@ +import os + +from dotenv import load_dotenv from prefect import flow, get_run_logger, task -from data_validation import general_data_validation +from data_validation import data_validation from general_exporter import export_switchboard +def get_api_key_from_env(): + with open("/srv/container.secret", "r") as secrets: + load_dotenv(stream=secrets) + api_key = os.environ["TILED_API_KEY"] + return api_key + + @task def log_completion(): logger = get_run_logger() @@ -11,8 +21,10 @@ def log_completion(): @flow -def end_of_run_workflow(stop_doc): +def end_of_run_workflow(stop_doc, api_key=None, dry_run=False): + if api_key is None: + api_key = get_api_key_from_env() uid = stop_doc["run_start"] - general_data_validation(uid, beamline_acronym="haxpes") - export_switchboard(uid, beamline_acronym="haxpes") + data_validation(uid, api_key=api_key) + export_switchboard(uid, api_key=api_key, dry_run=dry_run) log_completion() diff --git a/export_tools.py b/export_tools.py index de10f90..1352530 100644 --- a/export_tools.py +++ b/export_tools.py @@ -1,8 +1,8 @@ import re from numpy import column_stack, transpose -from prefect.blocks.system import Secret -from tiled.client import from_profile +from prefect import task +from tiled.client import from_uri def get_proposal_path(run): @@ -65,13 +65,6 @@ def get_photon_energy(run, default=0): return str(en) -def get_scantype(run): - if "scantype" in run.start.keys(): - return run.start["scantype"] - else: - return None - - def get_general_metadata(run): metadata = get_mono_md(run) @@ -79,7 +72,7 @@ def get_general_metadata(run): try: metadata["Proposal"] = str(run.start["proposal"]["proposal_id"]) except: - metadata["Proposal"] = unknown + metadata["Proposal"] = "Unknown" metadata["UID"] = get_md(run, "uid") metadata["Start Date/Time"] = get_md(run, "start_datetime") @@ -275,9 +268,11 @@ def get_generic_1d_data(run): return data_array -def initialize_tiled_client(beamline_acronym): - api_key = Secret.load(f"tiled-{beamline_acronym}-api-key", _sync=True).get() - return from_profile("nsls2", api_key=api_key)[beamline_acronym]["raw"] +@task(retries=2, retry_delay_seconds=10) +def get_run(uid, api_key=None): + tiled_client = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key) + run = tiled_client["haxpes/raw"][uid] + return run def generate_file_name(run, extension): diff --git a/file_exporter.py b/file_exporter.py index 037fa7e..0c3608c 100644 --- a/file_exporter.py +++ b/file_exporter.py @@ -3,14 +3,17 @@ from os import makedirs from os.path import exists, splitext +import h5py import numpy as np +from prefect import get_run_logger, task from export_tools import * -def export_peak_xps(uid, beamline_acronym="haxpes"): - catalog = initialize_tiled_client(beamline_acronym) - run = catalog[uid] +@task +def export_peak_xps(uid, api_key=None, dry_run=False): + logger = get_run_logger() + run = get_run(uid, api_key=api_key) metadata = get_metadata_xps(run) header = make_header(metadata, "xps") @@ -19,12 +22,16 @@ def export_peak_xps(uid, beamline_acronym="haxpes"): if not exists(export_path): makedirs(export_path) filename = export_path + generate_file_name(run, "csv") - np.savetxt(filename, data, delimiter=",", header=header) + if dry_run: + logger.info(f"Dry run: not exporting peak XPS data to {filename}") + else: + np.savetxt(filename, data, delimiter=",", header=header) -def export_ses_xps(uid, beamline_acronym="haxpes"): - catalog = initialize_tiled_client(beamline_acronym) - run = catalog[uid] +@task +def export_ses_xps(uid, api_key=None, dry_run=False): + logger = get_run_logger() + run = get_run(uid, api_key=api_key) metadata = get_metadata_xps(run) header = make_header(metadata, "xps") @@ -35,17 +42,21 @@ def export_ses_xps(uid, beamline_acronym="haxpes"): makedirs(export_path) filename = generate_file_name(run, "md") out_path = export_path + filename - write_header_only(out_path, header) - ses_files = glob(f"{ses_path}*_{scan_id}_*") - for ses_file in ses_files: - ext = splitext(ses_file)[1] - out_path = export_path + generate_file_name(run, ext) - shutil.copy(ses_file, out_path) - - -def export_xas(uid, beamline_acronym="haxpes"): - catalog = initialize_tiled_client(beamline_acronym) - run = catalog[uid] + if dry_run: + logger.info(f"Dry run: not exporting SES XPS data to {export_path}") + else: + write_header_only(out_path, header) + ses_files = glob(f"{ses_path}*_{scan_id}_*") + for ses_file in ses_files: + ext = splitext(ses_file)[1] + out_path = export_path + generate_file_name(run, ext) + shutil.copy(ses_file, out_path) + + +@task +def export_xas(uid, api_key=None, dry_run=False): + logger = get_run_logger() + run = get_run(uid, api_key=api_key) detlist = run.start["detectors"] metadata = get_general_metadata(run) @@ -56,12 +67,16 @@ def export_xas(uid, beamline_acronym="haxpes"): if not exists(export_path): makedirs(export_path) filename = export_path + generate_file_name(run, "csv") - np.savetxt(filename, data, delimiter=",", header=header) + if dry_run: + logger.info(f"Dry run: not exporting XAS data to {filename}") + else: + np.savetxt(filename, data, delimiter=",", header=header) -def export_generic_1D(uid, beamline_acronym="haxpes"): - catalog = initialize_tiled_client(beamline_acronym) - run = catalog[uid] +@task +def export_generic_1D(uid, api_key=None, dry_run=False): + logger = get_run_logger() + run = get_run(uid, api_key=api_key) detlist = run.start["detectors"] metadata = get_general_metadata(run) @@ -72,12 +87,15 @@ def export_generic_1D(uid, beamline_acronym="haxpes"): if not exists(export_path): makedirs(export_path) filename = export_path + generate_file_name(run, "csv") - np.savetxt(filename, data, delimiter=",", header=header) + if dry_run: + logger.info(f"Dry run: not exporting generic 1D data to {filename}") + else: + np.savetxt(filename, data, delimiter=",", header=header) -def export_resPES(uid, beamline_acronym="haxpes"): - catalog = initialize_tiled_client(beamline_acronym) - run = catalog[uid] +def export_resPES(uid, api_key=None, dry_run=False): + logger = get_run_logger() + run = get_run(uid, api_key=api_key) data_dictionary = get_resPES_data(run) @@ -85,24 +103,24 @@ def export_resPES(uid, beamline_acronym="haxpes"): if not exists(export_path): makedirs(export_path) filename = export_path + generate_file_name(run, "h5") - - f = h5py.File(filename, "a") - - datagroup = f.create_group("DataSets") - for key, value in data_dictionary["DataSets"].items(): - ds = datagroup.create_dataset(key, data=value) - - specgroup = f.create_group("Signals") - for key, value in data_dictionary["Signals"].items(): - s = specgroup.create_dataset(key, data=value) - s.attrs["X Axis"] = "Photon Energy" - - axisgroup = f.create_group("PlotAxes") - for key, value in data_dictionary["Axes"].items(): - a = axisgroup.create_dataset(key, data=value) - - metagroup = f.create_group("Meta") - for key, value in data_dictionary["Metadata"].items(): - metagroup.attrs[key] = value - - f.close() + if dry_run: + logger.info(f"Dry run: not exporting ResPES data to {filename}") + else: + with h5py.File(filename, "a") as f: + + datagroup = f.create_group("DataSets") + for key, value in data_dictionary["DataSets"].items(): + ds = datagroup.create_dataset(key, data=value) + + specgroup = f.create_group("Signals") + for key, value in data_dictionary["Signals"].items(): + s = specgroup.create_dataset(key, data=value) + s.attrs["X Axis"] = "Photon Energy" + + axisgroup = f.create_group("PlotAxes") + for key, value in data_dictionary["Axes"].items(): + a = axisgroup.create_dataset(key, data=value) + + metagroup = f.create_group("Meta") + for key, value in data_dictionary["Metadata"].items(): + metagroup.attrs[key] = value diff --git a/general_exporter.py b/general_exporter.py index 1b61efc..3431ba6 100644 --- a/general_exporter.py +++ b/general_exporter.py @@ -1,7 +1,7 @@ # from export_tools import get_proposal_path -from prefect import flow +from prefect import flow, get_run_logger -from export_tools import initialize_tiled_client +from export_tools import get_run from file_exporter import ( export_generic_1D, export_peak_xps, @@ -11,47 +11,29 @@ ) -def export_switchboard(uid, beamline_acronym="haxpes"): - c = initialize_tiled_client(beamline_acronym) - run = c[uid] +@flow +def export_switchboard(uid, api_key=None, dry_run=False): + logger = get_run_logger() + run = get_run(uid, api_key=api_key) if run.stop["exit_status"] != "abort": if run.start["autoexport"]: if "scantype" in run.start.keys(): if run.start["scantype"] == "xps": if run.start["analyzer_type"] == "peak": - peak_export(uid) + export_peak_xps(uid, api_key=api_key, dry_run=dry_run) elif run.start["analyzer_type"] == "ses": - ses_export(uid) + export_ses_xps(uid, api_key=api_key, dry_run=dry_run) + else: + logger.info( + f"Unknown analyzer type {run.start['analyzer_type']} for XPS scan" + ) elif run.start["scantype"] == "xas": - xas_export(uid) + export_xas(uid, api_key=api_key, dry_run=dry_run) elif run.start["scantype"] == "resPES": - resPES_export(uid) + export_resPES(uid, api_key=api_key, dry_run=dry_run) else: - generic_export(uid) + export_generic_1D(uid, api_key=api_key, dry_run=dry_run) else: - generic_export(uid) - - -@flow -def xas_export(uid, beamline_acronym="haxpes"): - export_xas(uid, beamline_acronym) - - -@flow -def peak_export(uid, beamline_acronym="haxpes"): - export_peak_xps(uid, beamline_acronym) - - -@flow -def generic_export(uid, beamline_acronym="haxpes"): - export_generic_1D(uid, beamline_acronym) - - -@flow -def ses_export(uid, beamline_acronym="haxpes"): - export_ses_xps(uid, beamline_acronym) - - -@flow -def resPES_export(uid, beamline_acronym="haxpes"): - export_resPES(uid, beamline_acronym) + export_generic_1D(uid, api_key=api_key, dry_run=dry_run) + else: + logger.info("Run was aborted, skipping exports") diff --git a/pixi.lock b/pixi.lock index 1b3654b..2af3086 100644 --- a/pixi.lock +++ b/pixi.lock @@ -55,6 +55,8 @@ environments: - conda: https://conda.anaconda.org/conda-forge/linux-64/c-ares-1.34.6-hb03c661_0.conda - conda: https://conda.anaconda.org/conda-forge/linux-64/c-blosc2-2.22.0-hc31b594_1.conda - conda: https://conda.anaconda.org/conda-forge/noarch/ca-certificates-2026.1.4-hbd8a1cb_0.conda + - conda: https://conda.anaconda.org/conda-forge/noarch/cached-property-1.5.2-hd8ed1ab_1.tar.bz2 + - conda: https://conda.anaconda.org/conda-forge/noarch/cached_property-1.5.2-pyha770c72_1.tar.bz2 - conda: https://conda.anaconda.org/conda-forge/noarch/cachetools-6.2.4-pyhd8ed1ab_0.conda - conda: https://conda.anaconda.org/conda-forge/linux-64/cairo-1.18.4-he90730b_1.conda - conda: https://conda.anaconda.org/conda-forge/noarch/certifi-2026.1.4-pyhd8ed1ab_0.conda @@ -108,7 +110,9 @@ environments: - conda: https://conda.anaconda.org/conda-forge/linux-64/gts-0.7.6-h977cf35_4.conda - conda: https://conda.anaconda.org/conda-forge/noarch/h11-0.16.0-pyhcf101f3_1.conda - conda: https://conda.anaconda.org/conda-forge/noarch/h2-4.3.0-pyhcf101f3_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/h5py-3.16.0-nompi_py313h253c126_100.conda - conda: https://conda.anaconda.org/conda-forge/linux-64/harfbuzz-12.3.0-h6083320_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/hdf5-1.14.6-nompi_h19486de_106.conda - conda: https://conda.anaconda.org/conda-forge/linux-64/hicolor-icon-theme-0.17-ha770c72_2.tar.bz2 - conda: https://conda.anaconda.org/conda-forge/noarch/hpack-4.1.0-pyhd8ed1ab_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/httpcore-1.0.9-pyh29332c3_0.conda @@ -135,6 +139,7 @@ environments: - conda: https://conda.anaconda.org/conda-forge/linux-64/ld_impl_linux-64-2.45-default_hbd61a6d_105.conda - conda: https://conda.anaconda.org/conda-forge/linux-64/lerc-4.0.0-h0aef613_1.conda - conda: https://conda.anaconda.org/conda-forge/linux-64/libabseil-20250512.1-cxx17_hba17884_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/libaec-1.1.5-h088129d_0.conda - conda: https://conda.anaconda.org/conda-forge/linux-64/libarrow-22.0.0-hb6ed5f4_6_cpu.conda - conda: https://conda.anaconda.org/conda-forge/linux-64/libarrow-acero-22.0.0-h635bf11_6_cpu.conda - conda: https://conda.anaconda.org/conda-forge/linux-64/libarrow-compute-22.0.0-h8c2c5c3_6_cpu.conda @@ -996,6 +1001,25 @@ packages: license: ISC size: 146519 timestamp: 1767500828366 +- conda: https://conda.anaconda.org/conda-forge/noarch/cached-property-1.5.2-hd8ed1ab_1.tar.bz2 + noarch: python + sha256: 561e6660f26c35d137ee150187d89767c988413c978e1b712d53f27ddf70ea17 + md5: 9b347a7ec10940d3f7941ff6c460b551 + depends: + - cached_property >=1.5.2,<1.5.3.0a0 + license: BSD-3-Clause + license_family: BSD + size: 4134 + timestamp: 1615209571450 +- conda: https://conda.anaconda.org/conda-forge/noarch/cached_property-1.5.2-pyha770c72_1.tar.bz2 + sha256: 6dbf7a5070cc43d90a1e4c2ec0c541c69d8e30a0e25f50ce9f6e4a432e42c5d7 + md5: 576d629e47797577ab0f1b351297ef4a + depends: + - python >=3.6 + license: BSD-3-Clause + license_family: BSD + size: 11065 + timestamp: 1615209567874 - conda: https://conda.anaconda.org/conda-forge/noarch/cachetools-6.2.4-pyhd8ed1ab_0.conda sha256: e00325243791f4337d147224e4e1508de450aeeab1abc0470f2227748deddbfc md5: 629c8fd0c11eb853732608e2454abf8e @@ -1697,6 +1721,21 @@ packages: license_family: MIT size: 95967 timestamp: 1756364871835 +- conda: https://conda.anaconda.org/conda-forge/linux-64/h5py-3.16.0-nompi_py313h253c126_100.conda + sha256: 0a05551c6007d680e8f1a9b121e6ca100b950151437f3bb1639e1acec2dd02e5 + md5: 0f91633d043df7c2fd077944ef483546 + depends: + - __glibc >=2.17,<3.0.a0 + - cached-property + - hdf5 >=1.14.6,<1.14.7.0a0 + - libgcc >=14 + - numpy >=1.23,<3 + - python >=3.13,<3.14.0a0 + - python_abi 3.13.* *_cp313 + license: BSD-3-Clause + license_family: BSD + size: 1332964 + timestamp: 1774320675495 - conda: https://conda.anaconda.org/conda-forge/linux-64/harfbuzz-12.3.0-h6083320_0.conda sha256: eb0ff4632c76d5840ad8f509dc55694f79d9ac9bea5529944640e28e490361b0 md5: 1ea5ed29aea252072b975a232b195146 @@ -1716,6 +1755,23 @@ packages: license_family: MIT size: 2062122 timestamp: 1766937132307 +- conda: https://conda.anaconda.org/conda-forge/linux-64/hdf5-1.14.6-nompi_h19486de_106.conda + sha256: 1fc50ce3b86710fba3ec9c5714f1612b5ffa4230d70bfe43e2a1436eacba1621 + md5: c223ee1429ba538f3e48cfb4a0b97357 + depends: + - __glibc >=2.17,<3.0.a0 + - libaec >=1.1.5,<2.0a0 + - libcurl >=8.18.0,<9.0a0 + - libgcc >=14 + - libgfortran + - libgfortran5 >=14.3.0 + - libstdcxx >=14 + - libzlib >=1.3.1,<2.0a0 + - openssl >=3.5.5,<4.0a0 + license: BSD-3-Clause + license_family: BSD + size: 3708864 + timestamp: 1770390337946 - conda: https://conda.anaconda.org/conda-forge/linux-64/hicolor-icon-theme-0.17-ha770c72_2.tar.bz2 sha256: 336f29ceea9594f15cc8ec4c45fdc29e10796573c697ee0d57ebb7edd7e92043 md5: bbf6f174dcd3254e19a2f5d2295ce808 @@ -2000,6 +2056,17 @@ packages: license_family: Apache size: 1310612 timestamp: 1750194198254 +- conda: https://conda.anaconda.org/conda-forge/linux-64/libaec-1.1.5-h088129d_0.conda + sha256: 822e4ae421a7e9c04e841323526321185f6659222325e1a9aedec811c686e688 + md5: 86f7414544ae606282352fa1e116b41f + depends: + - __glibc >=2.17,<3.0.a0 + - libgcc >=14 + - libstdcxx >=14 + license: BSD-2-Clause + license_family: BSD + size: 36544 + timestamp: 1769221884824 - conda: https://conda.anaconda.org/conda-forge/linux-64/libarrow-22.0.0-hb6ed5f4_6_cpu.conda build_number: 6 sha256: bab5fcb86cf28a3de65127fbe61ed9194affc1cf2d9b60a9e09af8a8b96b93e3 diff --git a/pixi.toml b/pixi.toml index ce71111..e396c6b 100644 --- a/pixi.toml +++ b/pixi.toml @@ -10,3 +10,4 @@ tiled-client = ">=0.2.3" bluesky-tiled-plugins = ">=2" prefect-docker = "*" numpy = "*" +h5py = "*" diff --git a/prefect.yaml b/prefect.yaml index a390f3b..ad3310f 100644 --- a/prefect.yaml +++ b/prefect.yaml @@ -23,7 +23,7 @@ pull: # the deployments section allows you to provide configuration for deploying flows deployments: - name: haxpes-end-of-run-workflow-docker - version: 0.1.2 + version: 0.1.3 tags: - haxpes - sst @@ -35,14 +35,12 @@ deployments: name: haxpes-work-pool-docker work_queue_name: job_variables: - env: - TILED_SITE_PROFILES: /nsls2/software/etc/tiled/profiles image: ghcr.io/nsls2/haxpes-workflows:main image_pull_policy: Always network: slirp4netns volumes: - /nsls2/data/sst/proposals:/nsls2/data/sst/proposals - - /nsls2/software/etc/tiled:/nsls2/software/etc/tiled + - /srv/prefect3-docker-worker-haxpes/app:/srv container_create_kwargs: userns_mode: "keep-id:uid=402953,gid=402953" # workflow-sst:workflow-sst auto_remove: true