Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@

from prefect import flow, get_run_logger, task

from export_tools import initialize_tiled_client
from export_tools import get_run


@task(retries=2, retry_delay_seconds=10)
def read_all_streams(uid, beamline_acronym="haxpes"):
@task
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@task
@task(retries=2, retry_delay_seconds=10)

def read_stream(run, stream):
stream_data = run[stream].read()
return stream_data


@flow(retries=2, retry_delay_seconds=10)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@flow(retries=2, retry_delay_seconds=10)
@flow

def data_validation(uid, api_key=None):
logger = get_run_logger()
tiled_client = initialize_tiled_client(beamline_acronym)
run = tiled_client[uid]
logger.info(f"Validating uid {run.start['uid']}")
run = get_run(uid, api_key=api_key)
logger.info(f"Validating uid {uid}")
start_time = time.monotonic()
for stream in run:
logger.info(f"{stream}:")
stream_start_time = time.monotonic()
stream_data = run[stream].read()
stream_data = read_stream(run, stream)
stream_elapsed_time = time.monotonic() - stream_start_time
logger.info(f"{stream} elapsed_time = {stream_elapsed_time}")
logger.info(f"{stream} nbytes = {stream_data.nbytes:_}")
elapsed_time = time.monotonic() - start_time
logger.info(f"{elapsed_time = }")


@flow
def general_data_validation(uid, beamline_acronym="haxpes"):
read_all_streams(uid, beamline_acronym)
20 changes: 16 additions & 4 deletions end_of_run_workflow.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,30 @@
import os

from dotenv import load_dotenv
from prefect import flow, get_run_logger, task

from data_validation import general_data_validation
from data_validation import data_validation
from general_exporter import export_switchboard


def get_api_key_from_env():
with open("/srv/container.secret", "r") as secrets:
load_dotenv(stream=secrets)
api_key = os.environ["TILED_API_KEY"]
return api_key


@task
def log_completion():
logger = get_run_logger()
logger.info("Complete")


@flow
def end_of_run_workflow(stop_doc):
def end_of_run_workflow(stop_doc, api_key=None, dry_run=False):
if api_key is None:
api_key = get_api_key_from_env()
uid = stop_doc["run_start"]
general_data_validation(uid, beamline_acronym="haxpes")
export_switchboard(uid, beamline_acronym="haxpes")
data_validation(uid, api_key=api_key)
export_switchboard(uid, api_key=api_key, dry_run=dry_run)
log_completion()
14 changes: 8 additions & 6 deletions export_tools.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import re

from numpy import column_stack, transpose
from prefect.blocks.system import Secret
from tiled.client import from_profile
from prefect import task
from tiled.client import from_uri


def get_proposal_path(run):
Expand Down Expand Up @@ -79,7 +79,7 @@ def get_general_metadata(run):
try:
metadata["Proposal"] = str(run.start["proposal"]["proposal_id"])
except:
metadata["Proposal"] = unknown
metadata["Proposal"] = "Unknown"

metadata["UID"] = get_md(run, "uid")
metadata["Start Date/Time"] = get_md(run, "start_datetime")
Expand Down Expand Up @@ -275,9 +275,11 @@ def get_generic_1d_data(run):
return data_array


def initialize_tiled_client(beamline_acronym):
api_key = Secret.load(f"tiled-{beamline_acronym}-api-key", _sync=True).get()
return from_profile("nsls2", api_key=api_key)[beamline_acronym]["raw"]
@task(retries=2, retry_delay_seconds=10)
def get_run(uid, api_key=None):
tiled_client = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key)
run = tiled_client["haxpes/raw"][uid]
return run


def generate_file_name(run, extension):
Expand Down
112 changes: 65 additions & 47 deletions file_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
from os import makedirs
from os.path import exists, splitext

import h5py
import numpy as np
from prefect import get_run_logger, task

from export_tools import *


def export_peak_xps(uid, beamline_acronym="haxpes"):
catalog = initialize_tiled_client(beamline_acronym)
run = catalog[uid]
@task
def export_peak_xps(uid, api_key=None, dry_run=False):
logger = get_run_logger()
run = get_run(uid, api_key=api_key)

metadata = get_metadata_xps(run)
header = make_header(metadata, "xps")
Expand All @@ -19,12 +22,16 @@ def export_peak_xps(uid, beamline_acronym="haxpes"):
if not exists(export_path):
makedirs(export_path)
filename = export_path + generate_file_name(run, "csv")
np.savetxt(filename, data, delimiter=",", header=header)
if dry_run:
logger.info(f"Dry run: not exporting peak XPS data to {filename}")
else:
np.savetxt(filename, data, delimiter=",", header=header)


def export_ses_xps(uid, beamline_acronym="haxpes"):
catalog = initialize_tiled_client(beamline_acronym)
run = catalog[uid]
@task
def export_ses_xps(uid, api_key=None, dry_run=False):
logger = get_run_logger()
run = get_run(uid, api_key=api_key)

metadata = get_metadata_xps(run)
header = make_header(metadata, "xps")
Expand All @@ -35,17 +42,21 @@ def export_ses_xps(uid, beamline_acronym="haxpes"):
makedirs(export_path)
filename = generate_file_name(run, "md")
out_path = export_path + filename
write_header_only(out_path, header)
ses_files = glob(f"{ses_path}*_{scan_id}_*")
for ses_file in ses_files:
ext = splitext(ses_file)[1]
out_path = export_path + generate_file_name(run, ext)
shutil.copy(ses_file, out_path)


def export_xas(uid, beamline_acronym="haxpes"):
catalog = initialize_tiled_client(beamline_acronym)
run = catalog[uid]
if dry_run:
logger.info(f"Dry run: not exporting SES XPS data to {export_path}")
else:
write_header_only(out_path, header)
ses_files = glob(f"{ses_path}*_{scan_id}_*")
for ses_file in ses_files:
ext = splitext(ses_file)[1]
out_path = export_path + generate_file_name(run, ext)
shutil.copy(ses_file, out_path)


@task
def export_xas(uid, api_key=None, dry_run=False):
logger = get_run_logger()
run = get_run(uid, api_key=api_key)

detlist = run.start["detectors"]
metadata = get_general_metadata(run)
Expand All @@ -56,12 +67,16 @@ def export_xas(uid, beamline_acronym="haxpes"):
if not exists(export_path):
makedirs(export_path)
filename = export_path + generate_file_name(run, "csv")
np.savetxt(filename, data, delimiter=",", header=header)
if dry_run:
logger.info(f"Dry run: not exporting XAS data to {filename}")
else:
np.savetxt(filename, data, delimiter=",", header=header)


def export_generic_1D(uid, beamline_acronym="haxpes"):
catalog = initialize_tiled_client(beamline_acronym)
run = catalog[uid]
@task
def export_generic_1D(uid, api_key=None, dry_run=False):
logger = get_run_logger()
run = get_run(uid, api_key=api_key)

detlist = run.start["detectors"]
metadata = get_general_metadata(run)
Expand All @@ -72,37 +87,40 @@ def export_generic_1D(uid, beamline_acronym="haxpes"):
if not exists(export_path):
makedirs(export_path)
filename = export_path + generate_file_name(run, "csv")
np.savetxt(filename, data, delimiter=",", header=header)
if dry_run:
logger.info(f"Dry run: not exporting generic 1D data to {filename}")
else:
np.savetxt(filename, data, delimiter=",", header=header)


def export_resPES(uid, beamline_acronym="haxpes"):
catalog = initialize_tiled_client(beamline_acronym)
run = catalog[uid]
def export_resPES(uid, api_key=None, dry_run=False):
logger = get_run_logger()
run = get_run(uid, api_key=api_key)

data_dictionary = get_resPES_data(run)

export_path = get_proposal_path(run) + "ResPES/"
if not exists(export_path):
makedirs(export_path)
filename = export_path + generate_file_name(run, "h5")

f = h5py.File(filename, "a")

datagroup = f.create_group("DataSets")
for key, value in data_dictionary["DataSets"].items():
ds = datagroup.create_dataset(key, data=value)

specgroup = f.create_group("Signals")
for key, value in data_dictionary["Signals"].items():
s = specgroup.create_dataset(key, data=value)
s.attrs["X Axis"] = "Photon Energy"

axisgroup = f.create_group("PlotAxes")
for key, value in data_dictionary["Axes"].items():
a = axisgroup.create_dataset(key, data=value)

metagroup = f.create_group("Meta")
for key, value in data_dictionary["Metadata"].items():
metagroup.attrs[key] = value

f.close()
if dry_run:
logger.info(f"Dry run: not exporting ResPES data to {filename}")
else:
with h5py.File(filename, "a") as f:

datagroup = f.create_group("DataSets")
for key, value in data_dictionary["DataSets"].items():
ds = datagroup.create_dataset(key, data=value)

specgroup = f.create_group("Signals")
for key, value in data_dictionary["Signals"].items():
s = specgroup.create_dataset(key, data=value)
s.attrs["X Axis"] = "Photon Energy"

axisgroup = f.create_group("PlotAxes")
for key, value in data_dictionary["Axes"].items():
a = axisgroup.create_dataset(key, data=value)

metagroup = f.create_group("Meta")
for key, value in data_dictionary["Metadata"].items():
metagroup.attrs[key] = value
50 changes: 14 additions & 36 deletions general_exporter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# from export_tools import get_proposal_path
from prefect import flow
from prefect import flow, get_run_logger

from export_tools import initialize_tiled_client
from export_tools import get_run
from file_exporter import (
export_generic_1D,
export_peak_xps,
Expand All @@ -11,47 +11,25 @@
)


def export_switchboard(uid, beamline_acronym="haxpes"):
c = initialize_tiled_client(beamline_acronym)
run = c[uid]
@flow
def export_switchboard(uid, api_key=None, dry_run=False):
logger = get_run_logger()
run = get_run(uid, api_key=api_key)
if run.stop["exit_status"] != "abort":
if run.start["autoexport"]:
if "scantype" in run.start.keys():
if run.start["scantype"] == "xps":
if run.start["analyzer_type"] == "peak":
peak_export(uid)
export_peak_xps(uid, api_key=api_key, dry_run=dry_run)
elif run.start["analyzer_type"] == "ses":
ses_export(uid)
export_ses_xps(uid, api_key=api_key, dry_run=dry_run)
elif run.start["scantype"] == "xas":
xas_export(uid)
export_xas(uid, api_key=api_key, dry_run=dry_run)
elif run.start["scantype"] == "resPES":
resPES_export(uid)
export_resPES(uid, api_key=api_key, dry_run=dry_run)
else:
generic_export(uid)
export_generic_1D(uid, api_key=api_key, dry_run=dry_run)
else:
generic_export(uid)


@flow
def xas_export(uid, beamline_acronym="haxpes"):
export_xas(uid, beamline_acronym)


@flow
def peak_export(uid, beamline_acronym="haxpes"):
export_peak_xps(uid, beamline_acronym)


@flow
def generic_export(uid, beamline_acronym="haxpes"):
export_generic_1D(uid, beamline_acronym)


@flow
def ses_export(uid, beamline_acronym="haxpes"):
export_ses_xps(uid, beamline_acronym)


@flow
def resPES_export(uid, beamline_acronym="haxpes"):
export_resPES(uid, beamline_acronym)
export_generic_1D(uid, api_key=api_key, dry_run=dry_run)
else:
logger.info("Run was aborted, skipping exports")
Loading
Loading