Skip to content

Commit 4f5acbb

Browse files
committed
adding bufr examples to obs-data building
1 parent 8d6ccd6 commit 4f5acbb

8 files changed

+848
-10
lines changed

tests/create/bufr2df.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import eccodes
2+
import numpy as np
3+
import pandas as pd
4+
import tqdm
5+
from earthkit.data.readers.bufr.bufr import BUFRReader
6+
from gribapi.errors import KeyValueNotFoundError
7+
8+
9+
def filter_values(df: pd.DataFrame, filters: dict) -> pd.DataFrame:
10+
"""Filter the DataFrame based on the specified conditions"""
11+
for col, condition in filters.items():
12+
if isinstance(condition, str):
13+
condition = eval(condition)
14+
if callable(condition):
15+
df = df[df[col].apply(condition)]
16+
elif isinstance(condition, slice):
17+
start, stop = condition.start, condition.stop
18+
query_str = f"({start} <= {col}) & ({col} < {stop})"
19+
df = df.query(query_str)
20+
elif isinstance(condition, (list, set)):
21+
df = df[df[col].isin(condition)]
22+
else:
23+
raise ValueError(f"Invalid condition for column '{col}': {condition}")
24+
return df
25+
26+
27+
def bufr_get_array(bid: int, element: str, typ: type, nsubsets: int, missing_val=np.nan) -> np.ndarray:
28+
"""Wrapper for codes_get_array to work around the inconsistent handling of arrays in eccodes when data is constant"""
29+
try:
30+
arr = eccodes.codes_get_array(bid, element, typ)
31+
if len(arr) == 1:
32+
arr = np.ones(nsubsets, dtype=typ) * arr
33+
except KeyValueNotFoundError:
34+
arr = np.ones(nsubsets, dtype=typ) * missing_val
35+
return arr
36+
37+
38+
def extract_datetimes(bid: int, nreports: int) -> pd.DataFrame:
39+
"""Extracts and parses the date/time info from a bufr message
40+
and returns as an array of datetime objects
41+
"""
42+
df = pd.DataFrame(
43+
dict(
44+
years=bufr_get_array(bid, "year", int, nreports),
45+
months=bufr_get_array(bid, "month", int, nreports),
46+
days=bufr_get_array(bid, "day", int, nreports),
47+
hours=bufr_get_array(bid, "hour", int, nreports),
48+
minutes=bufr_get_array(bid, "minute", int, nreports),
49+
seconds=bufr_get_array(bid, "second", int, nreports, missing_val=0),
50+
)
51+
)
52+
# Create the datetime series using pandas
53+
datetimes = pd.to_datetime(df)
54+
return datetimes
55+
56+
57+
def get_msg(f, i, per_report_dict, per_datum_dict=None, filters=None) -> pd.DataFrame:
58+
bid = eccodes.codes_bufr_new_from_file(f)
59+
eccodes.codes_set(bid, "unpack", 1)
60+
nreports = eccodes.codes_get(bid, "numberOfSubsets")
61+
62+
data_dict = {
63+
item: bufr_get_array(bid, col, float, nreports).astype(np.float32) for col, item in per_report_dict.items()
64+
}
65+
data_dict["times"] = extract_datetimes(bid, nreports)
66+
67+
if per_datum_dict:
68+
for col, sub_dict in per_datum_dict.items():
69+
ndatum = eccodes.codes_get_size(bid, next(iter(per_datum_dict))) // nreports
70+
vals = bufr_get_array(bid, col, float, nreports * ndatum).astype(np.float32)
71+
try:
72+
vals_2d = vals.reshape(ndatum, nreports).T
73+
except ValueError as e:
74+
if "cannot reshape array" in str(e):
75+
import warnings
76+
77+
warnings.warn(
78+
f"Reshape error in file {f}, message {i}: Cannot reshape array of size {len(vals)} "
79+
f"into shape ({ndatum}, {nreports}). Skipping this message.",
80+
RuntimeWarning,
81+
)
82+
eccodes.codes_release(bid)
83+
return None
84+
else:
85+
raise # Re-raise if it's a different ValueError
86+
87+
for col_rename, slice_str in sub_dict.items():
88+
vals_col = vals_2d[:, eval(slice_str)]
89+
for i in range(vals_col.shape[1]):
90+
data_dict[f"{col_rename}_{i+1}"] = vals_col[:, i]
91+
92+
df = pd.DataFrame(data_dict)
93+
94+
if filters:
95+
df = filter_values(df, filters)
96+
97+
eccodes.codes_release(bid)
98+
return df
99+
100+
101+
def bufr2df(
102+
ekd_ds: BUFRReader,
103+
per_report: dict,
104+
per_datum: dict = None,
105+
filter: dict = None,
106+
) -> pd.DataFrame:
107+
"""Extracts data from a BUFR file into a pandas DataFrame
108+
-info on what to extract (and how it should be named in the dataframe) are
109+
provided by input dictionaries; one at the per-report level and another for the per-datum
110+
"""
111+
fname = ekd_ds.path
112+
with open(fname, "rb") as f:
113+
nmessages = eccodes.codes_count_in_file(f)
114+
bar = tqdm.tqdm(
115+
iterable=range(nmessages),
116+
desc="Processing bufr messages...",
117+
mininterval=20.0,
118+
)
119+
df_lst = [get_msg(f, i, per_report, per_datum, filter) for i in bar]
120+
df = pd.concat(df_lst)
121+
df = df.sort_values(by=["times"]).reset_index(drop=True)
122+
return df

0 commit comments

Comments
 (0)