-
Notifications
You must be signed in to change notification settings - Fork 3
feat: support for chunked arrays #58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Dask tests fail probably because:
I will try it later, but a lead to fix this would be to open a fresh reader per call. like so: class SelafinLazyArray(BackendArray):
def __init__(self, slf_reader_or_path, var, dtype, shape):
# Accept either a reader instance or a path/reader-factory
self.slf_reader = slf_reader_or_path
self.var = var
self.dtype = dtype
self.shape = shape
def _open_reader(self):
# If self.slf_reader is already a path or factory, open a fresh reader
if isinstance(self.slf_reader, str):
return SelafinReader(self.slf_reader) # <-- adapt to your reader constructor
# If provided a reader instance, try to get its path attribute
if hasattr(self.slf_reader, "filepath"):
return SelafinReader(self.slf_reader.filepath)
# otherwise, fallback: assume 'slf_reader' is already a lightweight factory
return self.slf_reader
def _raw_indexing_method(self, key):
...
for it, t in enumerate(time_indices):
t_int = int(t)
# open a fresh reader for this task
reader = self._open_reader()
try:
temp = np.asarray(reader.read_var_in_frame(t_int, self.var))
finally:
# close if the reader supports close()
if hasattr(reader, "close"):
try:
reader.close()
except Exception:
pass
... @lucduron what do you think? |
- Use a dask.utils.SerializableLock instance to avoid conflict while reading - Refactor some variable in loops to be more precise - Add a SerafinRequestError in cas of buffer truncation (in Serafin.read_var_in_frame)
Hi @tomsail, Sorry for the delay, I had a look to the problem only today. The problem is not linked to Serafin class as I can use it with multiprocessing/threadings on a same file without problems. But with xarray, it seems that some locks are expected and I tried to follow the documentation. The performance should now be checked to know if this implementation of dask if interesting (of course we currently read multiple times the same buffer in case of chunk on nodes of example). Hope it helps, |
Hi @tomsail, Could you try to check if #58 is now fixed with my corrections? Moreover I did 2 additionnal commits in this branch to :
I am aware that it is not linked with chunked arrays, so feel free to cherrypick my commits in another branch/PR. Best Regards, |
Hi @tomsail, I added 5 minor commits today. |
Hi @tomsail, I did a rapid benchmark on a large 2D Selafin file (3.62 Go) to quantify the gain in IO (reading/writing frames in series). Here is the snippet used: import os
from time import perf_counter, sleep
import xarray as xr
from xarray_selafin.Serafin import Read, Write
from xarray_selafin.xarray_backend import SelafinBackendEntrypoint
slf_in = "r2d_large.slf" # 3.62 Go (2D file with 215362 nodes, 429285 elements, 409 frames, 11 variables)
slf_out = "temp_out.slf"
lang = "fr"
def remove_slf_out():
if os.path.exists(slf_out):
os.remove(slf_out)
sleep(2.0)
def compare_slf():
with open(slf_in, "rb") as in_slf1, open(slf_out, "rb") as in_slf2:
assert in_slf1.read() == in_slf2.read()
remove_slf_out()
t1 = perf_counter()
with Read(slf_in, lang) as resin:
with Write(slf_out, lang, overwrite=True) as resout:
resin.read_header()
resin.get_time()
out_header = resin.header
resout.write_header(out_header)
for time_index, time in enumerate(resin.time):
resout.write_entire_frame(out_header, time, resin.read_vars_in_frame(time_index=time_index))
t2 = perf_counter()
print("Pure PyTelTools: ", t2 - t1)
compare_slf()
remove_slf_out()
t3 = perf_counter()
ds = xr.open_dataset(
slf_in,
disable_lock=True, # remove this arg for main
lazy_loading=False,
engine=SelafinBackendEntrypoint,
lang=lang,
)
ds.selafin.write(slf_out)
t4 = perf_counter()
print("xarray without lazy_loading: ", t4 - t3)
compare_slf()
remove_slf_out()
t5 = perf_counter()
ds = xr.open_dataset(
slf_in,
disable_lock=True, # remove this arg for main
lazy_loading=True,
engine=SelafinBackendEntrypoint,
lang=lang,
)
ds.selafin.write(slf_out)
t6 = perf_counter()
print("xarray with lazy_loading: ", t6 - t5)
compare_slf() Below are the results in details:
For dask, it is now working but probably not fully optimized/adapted, but I will not have time to investigate in the short term. I do not plan to commit other corrections or improvements in the coming days ;). Luc |
For reference: #57