diff --git a/neo/io/nestio.py b/neo/io/nestio.py index f2ed17b65..f13cd217e 100644 --- a/neo/io/nestio.py +++ b/neo/io/nestio.py @@ -7,12 +7,11 @@ Supported: Read -Authors: Julia Sprenger, Maximilian Schmidt, Johanna Senk +Authors: Julia Sprenger, Maximilian Schmidt, Johanna Senk, +Simon Essink, Robin Gutzen, Jasper Albers, Aitor Morales-Gregorio """ -# needed for Python3 compatibility - import os.path import warnings from datetime import datetime @@ -22,7 +21,9 @@ from neo.io.baseio import BaseIO from neo.core import Block, Segment, SpikeTrain, AnalogSignal -value_type_dict = {"V": pq.mV, "I": pq.pA, "g": pq.CompoundUnit("10^-9*S"), "no type": pq.dimensionless} +value_type_dict = {"V": pq.mV, "I": pq.pA, + "g": pq.CompoundUnit("10^-9*S"), + "no type": pq.dimensionless} class NestIO(BaseIO): @@ -54,35 +55,36 @@ class NestIO(BaseIO): write_params = None # writing is not supported - name = "nest" - extensions = ["gdf", "dat"] - mode = "file" + name = 'nest' + supported_target_objects = ['SpikeTrain', 'AnalogSignal'] + mode = 'file' - def __init__(self, filenames=None): + def __init__(self, filenames=None, target_object='SpikeTrain', **kwargs): """ Parameters ---------- filenames: string or list of strings, default=None The filename or list of filenames to load. + target_object : string or list of strings, default='SpikeTrain' + The type of neo object that should be read out from the input. + Options are: 'SpikeTrain', 'AnalogSignal' + kwargs : dict like + keyword arguments that will be passed to `numpy.loadtxt` see + https://numpy.org/devdocs/reference/generated/numpy.loadtxt.html """ + if target_object not in self.supported_target_objects: + raise ValueError(f'{target_object} is not a valid object type. ' + f'Valid values are {self.objects}.') + # Ensure filenames is always a list if isinstance(filenames, str): filenames = [filenames] + # Turn kwargs to attributes self.filenames = filenames - self.avail_formats = {} - self.avail_IOs = {} - - for filename in filenames: - path, ext = os.path.splitext(filename) - ext = ext.strip(".") - if ext in self.extensions: - if ext in self.avail_IOs: - raise ValueError( - f'Received multiple files with "{ext}" ' "extension. Can only load single file of " "this type." - ) - self.avail_IOs[ext] = ColumnIO(filename) - self.avail_formats[ext] = path + self.target_object = target_object + + self.IOs = [ColumnIO(filename, **kwargs) for filename in filenames] def __read_analogsignals( self, @@ -101,9 +103,6 @@ def __read_analogsignals( Internal function called by read_analogsignal() and read_segment(). """ - if "dat" not in self.avail_formats: - raise ValueError("Can not load analogsignals. No DAT file " "provided.") - # checking gid input parameters gid_list, id_column = self._check_input_gids(gid_list, id_column) # checking time input parameters @@ -116,15 +115,19 @@ def __read_analogsignals( # defining standard column order for internal usage # [id_column, time_column, value_column1, value_column2, ...] - column_ids = [id_column, time_column] + value_columns + column_ids = [id_column, time_column] + if value_columns is not None: + column_ids += value_columns for i, cid in enumerate(column_ids): if cid is None: column_ids[i] = -1 # assert that no single column is assigned twice - column_list = [id_column, time_column] + value_columns + column_list = [id_column, time_column] + if value_columns is not None: + column_list += value_columns column_list_no_None = [c for c in column_list if c is not None] - if len(np.unique(column_list_no_None)) < len(column_list_no_None): + if len(set(column_list_no_None)) < len(column_list_no_None): raise ValueError( "One or more columns have been specified to contain " "the same data. Columns were specified to {column_list_no_None}." @@ -132,53 +135,68 @@ def __read_analogsignals( ) # extracting condition and sorting parameters for raw data loading - (condition, condition_column, sorting_column) = self._get_conditions_and_sorting( - id_column, time_column, gid_list, t_start, t_stop - ) - # loading raw data columns - data = self.avail_IOs["dat"].get_columns( - column_ids=column_ids, - condition=condition, - condition_column=condition_column, - sorting_columns=sorting_column, - ) + (condition, condition_column, + sorting_column) = self._get_conditions_and_sorting(id_column, + time_column, + gid_list, + t_start, + t_stop) - sampling_period = self._check_input_sampling_period(sampling_period, time_column, time_unit, data) analogsignal_list = [] + for col in self.IOs: + + # loading raw data columns + data = col.get_columns( + column_ids=column_ids, + condition=condition, + condition_column=condition_column, + sorting_columns=sorting_column) + + sampling_period = self._check_input_sampling_period( + sampling_period, + time_column, + time_unit, + data) + + # extracting complete gid list for anasig generation + if not gid_list and id_column is not None: + current_gid_list = np.unique(data[:, id_column]) + else: + current_gid_list = gid_list - # extracting complete gid list for anasig generation - if (gid_list == []) and id_column is not None: - gid_list = np.unique(data[:, id_column]) - - # generate analogsignals for each neuron ID - for i in gid_list: - selected_ids = self._get_selected_ids(i, id_column, time_column, t_start, t_stop, time_unit, data) + # generate analogsignals for each neuron ID + for i in current_gid_list: + selected_ids = self._get_selected_ids( + i, id_column, time_column, t_start, t_stop, time_unit, + data) - # extract starting time of analogsignal - if (time_column is not None) and data.size: - anasig_start_time = data[selected_ids[0], 1] * time_unit - else: - # set t_start equal to sampling_period because NEST starts - # recording only after 1 sampling_period - anasig_start_time = 1.0 * sampling_period - - # create one analogsignal per value column requested - for v_id, value_column in enumerate(value_columns): - signal = data[selected_ids[0] : selected_ids[1], value_column] - - # create AnalogSignal objects and annotate them with - # the neuron ID - analogsignal_list.append( - AnalogSignal( - signal * value_units[v_id], - sampling_period=sampling_period, - t_start=anasig_start_time, - id=i, - type=value_types[v_id], - ) - ) - # check for correct length of analogsignal - assert analogsignal_list[-1].t_stop == anasig_start_time + len(signal) * sampling_period + # extract starting time of analogsignal + if (time_column is not None) and data.size: + anasig_start_time = data[selected_ids[0], 1] * time_unit + else: + # set t_start equal to sampling_period because NEST starts + # recording only after 1 sampling_period + anasig_start_time = 1. * sampling_period + + if value_columns is not None: + # create one analogsignal per value column requested + for v_id, value_column in enumerate(value_columns): + signal = data[ + selected_ids[0]:selected_ids[1], value_column] + + # create AnalogSignal objects and annotate them with + # the neuron ID + analogsignal_list.append(AnalogSignal( + signal * value_units[v_id], + sampling_period=sampling_period, + t_start=anasig_start_time, + id=i, + source_file=col.filename, + type=value_types[v_id])) + # check for correct length of analogsignal + assert (analogsignal_list[-1].t_stop + == anasig_start_time + len(signal) * + sampling_period) return analogsignal_list def __read_spiketrains(self, gdf_id_list, time_unit, t_start, t_stop, id_column, time_column, **args): @@ -186,10 +204,6 @@ def __read_spiketrains(self, gdf_id_list, time_unit, t_start, t_stop, id_column, Internal function for reading multiple spiketrains at once. This function is called by read_spiketrain() and read_segment(). """ - - if "gdf" not in self.avail_IOs: - raise ValueError("Can not load spiketrains. No GDF file provided.") - # assert that the file contains spike times if time_column is None: raise ValueError("Time column is None. No spike times to " "be read in.") @@ -213,32 +227,46 @@ def __read_spiketrains(self, gdf_id_list, time_unit, t_start, t_stop, id_column, id_column, time_column, gdf_id_list, t_start, t_stop ) - data = self.avail_IOs["gdf"].get_columns( - column_ids=column_ids, - condition=condition, - condition_column=condition_column, - sorting_columns=sorting_column, - ) - - # create a list of SpikeTrains for all neuron IDs in gdf_id_list - # assign spike times to neuron IDs if id_column is given - if id_column is not None: - if (gdf_id_list == []) and id_column is not None: - gdf_id_list = np.unique(data[:, id_column]) - - spiketrain_list = [] - for nid in gdf_id_list: - selected_ids = self._get_selected_ids(nid, id_column, time_column, t_start, t_stop, time_unit, data) - times = data[selected_ids[0] : selected_ids[1], time_column] - spiketrain_list.append( - SpikeTrain(times, units=time_unit, t_start=t_start, t_stop=t_stop, id=nid, **args) - ) - - # if id_column is not given, all spike times are collected in one - # spike train with id=None - else: - train = data[:, time_column] - spiketrain_list = [SpikeTrain(train, units=time_unit, t_start=t_start, t_stop=t_stop, id=None, **args)] + spiketrain_list = [] + for col in self.IOs: + + data = col.get_columns( + column_ids=column_ids, + condition=condition, + condition_column=condition_column, + sorting_columns=sorting_column) + + # create a list of SpikeTrains for all neuron IDs in gdf_id_list + # assign spike times to neuron IDs if id_column is given + if id_column is not None: + if (gdf_id_list == []) and id_column is not None: + current_file_ids = np.unique(data[:, id_column]) + else: + current_file_ids = gdf_id_list + + for nid in current_file_ids: + selected_ids = self._get_selected_ids(nid, id_column, + time_column, t_start, + t_stop, time_unit, + data) + times = data[selected_ids[0]:selected_ids[1], time_column] + spiketrain_list.append(SpikeTrain(times, units=time_unit, + t_start=t_start, + t_stop=t_stop, + id=nid, + source_file=col.filename, + **args)) + + # if id_column is not given, all spike times are collected in one + # spike train with id=None + else: + train = data[:, time_column] + spiketrain_list.append([SpikeTrain(train, units=time_unit, + t_start=t_start, + t_stop=t_stop, + id=None, + source_file=col.filename, + **args)]) return spiketrain_list def _check_input_times(self, t_start, t_stop, mandatory=True): @@ -280,7 +308,10 @@ def _check_input_values_parameters(self, value_columns, value_types, value_units adjusted list of [value_columns, value_types, value_units] """ if value_columns is None: - raise ValueError("No value column provided.") + warnings.warn('No value column was provided.') + value_types = None + value_units = None + return value_columns, value_types, value_units if isinstance(value_columns, int): value_columns = [value_columns] if value_types is None: @@ -345,15 +376,18 @@ def _check_input_sampling_period(self, sampling_period, time_column, time_unit, """ if sampling_period is None: if time_column is not None: - data_sampling = np.unique(np.diff(sorted(np.unique(data[:, 1])))) + data_sampling = np.unique( + np.diff(sorted(np.unique(data[:, time_column])))) if len(data_sampling) > 1: raise ValueError(f"Different sampling distances found in " "data set ({data_sampling})") else: dt = data_sampling[0] else: - raise ValueError("Can not estimate sampling rate without time " "column id provided.") - sampling_period = pq.CompoundUnit(str(dt) + "*" + time_unit.units.u_symbol) - elif not isinstance(sampling_period, pq.UnitQuantity): + raise ValueError('Can not estimate sampling rate without time ' + 'column id provided.') + sampling_period = pq.CompoundUnit(str(dt) + '*' + + time_unit.units.u_symbol) + elif not isinstance(sampling_period, pq.Quantity): raise ValueError("sampling_period is not specified as a unit.") return sampling_period @@ -410,22 +444,26 @@ def _get_selected_ids(self, gid, id_column, time_column, t_start, t_stop, time_u Returns list of selected gids """ - gid_ids = np.array([0, data.shape[0]]) + gids = np.array([0, data.shape[id_column]]) if id_column is not None: - gid_ids = np.array( - [np.searchsorted(data[:, 0], gid, side="left"), np.searchsorted(data[:, 0], gid, side="right")] - ) - gid_data = data[gid_ids[0] : gid_ids[1], :] + gids = np.array([np.searchsorted(data[:, id_column], gid, side='left'), + np.searchsorted(data[:, id_column], gid, side='right')]) + gid_data = data[gids[0]:gids[1], :] # select only requested time range id_shifts = np.array([0, 0]) if time_column is not None: - id_shifts[0] = np.searchsorted(gid_data[:, 1], t_start.rescale(time_unit).magnitude, side="left") + id_shifts[0] = np.searchsorted(gid_data[:, time_column], + t_start.rescale(time_unit).magnitude, + side="left") id_shifts[1] = ( - np.searchsorted(gid_data[:, 1], t_stop.rescale(time_unit).magnitude, side="left") - gid_data.shape[0] + np.searchsorted(gid_data[:, time_column], + t_stop.rescale(time_unit).magnitude, + side="left") + - gid_data.shape[0] ) - selected_ids = gid_ids + id_shifts + selected_ids = gids + id_shifts return selected_ids def read_block( @@ -446,21 +484,13 @@ def read_block( ): assert not lazy, "Do not support lazy" - seg = self.read_segment( - gid_list, - time_unit, - t_start, - t_stop, - sampling_period, - id_column_dat, - time_column_dat, - value_columns_dat, - id_column_gdf, - time_column_gdf, - value_types, - value_units, - ) - blk = Block(file_origin=seg.file_origin, file_datetime=seg.file_datetime) + seg = self.read_segment(gid_list, time_unit, t_start, + t_stop, sampling_period, id_column_dat, + time_column_dat, value_columns_dat, + id_column_gdf, time_column_gdf, value_types, + value_units) + blk = Block(file_origin=seg.file_origin, + file_datetime=seg.file_datetime) blk.segments.append(seg) return blk @@ -489,8 +519,8 @@ def read_segment( gid_list : list, default: None A list of GDF IDs of which to return SpikeTrain(s). gid_list must be specified if the GDF file contains neuron IDs, the default None - then raises an error. Specify an empty list [] to retrieve the spike - trains of all neurons. + then raises an error. Specify an empty list [] to retrieve the + spike trains of all neurons. time_unit : Quantity (time), optional, default: quantities.ms The time unit of recorded time stamps in DAT as well as GDF files. t_start : Quantity (time), optional, default: 0 * pq.ms @@ -526,7 +556,8 @@ def read_segment( if isinstance(gid_list, tuple): if gid_list[0] > gid_list[1]: - raise ValueError("The second entry in gid_list must be " "greater or equal to the first entry.") + raise ValueError("The second entry in gid_list must be " + "greater or equal to the first entry.") gid_list = range(gid_list[0], gid_list[1] + 1) # __read_xxx() needs a list of IDs @@ -535,12 +566,11 @@ def read_segment( # create an empty Segment seg = Segment(file_origin=",".join(self.filenames)) - seg.file_datetime = datetime.fromtimestamp(os.stat(self.filenames[0]).st_mtime) - # todo: rather than take the first file for the timestamp, we should take the oldest - # in practice, there won't be much difference + seg.file_datetime = datetime.fromtimestamp( + os.stat(self.filenames[-1]).st_mtime) # Load analogsignals and attach to Segment - if "dat" in self.avail_formats: + if 'AnalogSignal' == self.target_object: seg.analogsignals = self.__read_analogsignals( gid_list, time_unit, @@ -551,9 +581,8 @@ def read_segment( time_column=time_column_dat, value_columns=value_columns_dat, value_types=value_types, - value_units=value_units, - ) - if "gdf" in self.avail_formats: + value_units=value_units) + if 'SpikeTrain' == self.target_object: seg.spiketrains = self.__read_spiketrains( gid_list, time_unit, t_start, t_stop, id_column=id_column_gdf, time_column=time_column_gdf ) @@ -673,7 +702,7 @@ class ColumnIO: Class for reading an ASCII file containing multiple columns of data. """ - def __init__(self, filename): + def __init__(self, filename, **kwargs): """ filename: string, path to ASCII file to read. """ @@ -683,12 +712,26 @@ def __init__(self, filename): # read the first line to check the data type (int or float) of the data f = open(self.filename) line = f.readline() + header_size = 0 + + # Check how many header lines the file has so they can be ignored + while line: + if line[0].isdigit(): + break + else: + header_size += 1 + line = f.readline() - additional_parameters = {} - if "." not in line: - additional_parameters["dtype"] = np.int32 + # Warn user when the header is removed + if header_size > 0: + warnings.warn(f'Ignoring {str(header_size)} header lines.') - self.data = np.loadtxt(self.filename, **additional_parameters) + if '.' not in line: + kwargs['dtype'] = np.int32 + else: + kwargs['dtype'] = np.float32 + + self.data = np.loadtxt(self.filename, skiprows=header_size, **kwargs) if len(self.data.shape) == 1: self.data = self.data[:, np.newaxis] @@ -719,10 +762,10 @@ def get_columns(self, column_ids="all", condition=None, condition_column=None, s column_ids = np.array(column_ids) if column_ids is not None: - if max(column_ids) >= len(self.data) - 1: - raise ValueError( - f"Can not load column ID {max(column_ids)}. File contains " f"only {len(self.data)} columns" - ) + if max(column_ids) > len(self.data) - 1: + raise ValueError('Can not load column ID %i. File contains ' + 'only %i columns' % (max(column_ids), + len(self.data))) if sorting_columns is not None: if isinstance(sorting_columns, int): diff --git a/neo/test/iotest/test_nestio.py b/neo/test/iotest/test_nestio.py index 86d7fab97..268a38ff1 100644 --- a/neo/test/iotest/test_nestio.py +++ b/neo/test/iotest/test_nestio.py @@ -1,14 +1,10 @@ """ Tests of neo.io.exampleio """ - import warnings - import unittest - import quantities as pq import numpy as np - from neo.io.nestio import ColumnIO from neo.io.nestio import NestIO from neo.test.iotest.common_io_test import BaseTestIO @@ -25,78 +21,42 @@ def test_read_analogsignal(self): - with GIDs, with times as floats - with GIDs, with time as integer """ - filename = self.get_local_path("nest/0gid-1time-2gex-3Vm-1261-0.dat") - r = NestIO(filenames=filename) - r.read_analogsignal( - gid=1, - t_stop=1000.0 * pq.ms, - sampling_period=pq.ms, - lazy=False, - id_column=0, - time_column=1, - value_column=2, - value_type="V_m", - ) - r.read_segment( - gid_list=[1], - t_stop=1000.0 * pq.ms, - sampling_period=pq.ms, - lazy=False, - id_column_dat=0, - time_column_dat=1, - value_columns_dat=2, - value_types="V_m", - ) - - filename = self.get_local_path("nest/0gid-1time_in_steps-2Vm-1263-0.dat") - r = NestIO(filenames=filename) - r.read_analogsignal( - gid=1, - t_stop=1000.0 * pq.ms, - time_unit=pq.CompoundUnit("0.1*ms"), - sampling_period=pq.ms, - lazy=False, - id_column=0, - time_column=1, - value_column=2, - value_type="V_m", - ) - r.read_segment( - gid_list=[1], - t_stop=1000.0 * pq.ms, - time_unit=pq.CompoundUnit("0.1*ms"), - sampling_period=pq.ms, - lazy=False, - id_column_dat=0, - time_column_dat=1, - value_columns_dat=2, - value_types="V_m", - ) - - filename = self.get_local_path("nest/0gid-1time-2Vm-1259-0.dat") - r = NestIO(filenames=filename) - r.read_analogsignal( - gid=1, - t_stop=1000.0 * pq.ms, - time_unit=pq.CompoundUnit("0.1*ms"), - sampling_period=pq.ms, - lazy=False, - id_column=0, - time_column=1, - value_column=2, - value_type="V_m", - ) - r.read_segment( - gid_list=[1], - t_stop=1000.0 * pq.ms, - time_unit=pq.CompoundUnit("0.1*ms"), - sampling_period=pq.ms, - lazy=False, - id_column_dat=0, - time_column_dat=1, - value_columns_dat=2, - value_types="V_m", - ) + filename = self.get_local_path('nest/0gid-1time-2gex-3Vm-1261-0.dat') + r = NestIO(filenames=filename, target_object='AnalogSignal') + r.read_analogsignal(gid=1, t_stop=1000. * pq.ms, + sampling_period=pq.ms, lazy=False, + id_column=0, time_column=1, + value_column=2, value_type='V_m') + r.read_segment(gid_list=[1], t_stop=1000. * pq.ms, + sampling_period=pq.ms, lazy=False, id_column_dat=0, + time_column_dat=1, value_columns_dat=2, + value_types='V_m') + + filename = self.get_local_path('nest/0gid-1time_in_steps-2Vm-1263-0.dat') + r = NestIO(filenames=filename, target_object='AnalogSignal') + r.read_analogsignal(gid=1, t_stop=1000. * pq.ms, + time_unit=pq.CompoundUnit('0.1*ms'), + sampling_period=pq.ms, lazy=False, + id_column=0, time_column=1, + value_column=2, value_type='V_m') + r.read_segment(gid_list=[1], t_stop=1000. * pq.ms, + time_unit=pq.CompoundUnit('0.1*ms'), + sampling_period=pq.ms, lazy=False, id_column_dat=0, + time_column_dat=1, value_columns_dat=2, + value_types='V_m') + + filename = self.get_local_path('nest/0gid-1time-2Vm-1259-0.dat') + r = NestIO(filenames=filename, target_object='AnalogSignal') + r.read_analogsignal(gid=1, t_stop=1000. * pq.ms, + time_unit=pq.CompoundUnit('0.1*ms'), + sampling_period=pq.ms, lazy=False, + id_column=0, time_column=1, + value_column=2, value_type='V_m') + r.read_segment(gid_list=[1], t_stop=1000. * pq.ms, + time_unit=pq.CompoundUnit('0.1*ms'), + sampling_period=pq.ms, lazy=False, id_column_dat=0, + time_column_dat=1, value_columns_dat=2, + value_types='V_m') def test_id_column_none_multiple_neurons(self): """ @@ -107,12 +67,13 @@ def test_id_column_none_multiple_neurons(self): filename = self.get_local_path("nest/0time-1255-0.gdf") r = NestIO(filenames=filename) with self.assertRaises(ValueError): - r.read_analogsignal( - t_stop=1000.0 * pq.ms, lazy=False, sampling_period=pq.ms, id_column=None, time_column=0, value_column=1 - ) - r.read_segment( - t_stop=1000.0 * pq.ms, lazy=False, sampling_period=pq.ms, id_column_gdf=None, time_column_gdf=0 - ) + r.read_spiketrain(t_stop=1000. * pq.ms, lazy=False, + sampling_period=pq.ms, + id_column=None, time_column=0, + value_column=1) + r.read_segment(t_stop=1000. * pq.ms, lazy=False, + sampling_period=pq.ms, id_column_gdf=None, + time_column_gdf=0) def test_values(self): """ @@ -120,17 +81,12 @@ def test_values(self): """ filename = self.get_local_path("nest/0gid-1time-2gex-3Vm-1261-0.dat") id_to_test = 1 - r = NestIO(filenames=filename) - seg = r.read_segment( - gid_list=[id_to_test], - t_stop=1000.0 * pq.ms, - sampling_period=pq.ms, - lazy=False, - id_column_dat=0, - time_column_dat=1, - value_columns_dat=2, - value_types="V_m", - ) + r = NestIO(filenames=filename, target_object='AnalogSignal') + seg = r.read_segment(gid_list=[id_to_test], + t_stop=1000. * pq.ms, + sampling_period=pq.ms, lazy=False, + id_column_dat=0, time_column_dat=1, + value_columns_dat=2, value_types='V_m') dat = np.loadtxt(filename) target_data = dat[:, 2][np.where(dat[:, 0] == id_to_test)] @@ -142,8 +98,8 @@ def test_read_segment(self): """ Tests if signals are correctly stored in a segment. """ - filename = self.get_local_path("nest/0gid-1time-2gex-1262-0.dat") - r = NestIO(filenames=filename) + filename = self.get_local_path('nest/0gid-1time-2gex-1262-0.dat') + r = NestIO(filenames=filename, target_object='AnalogSignal') id_list_to_test = range(1, 10) seg = r.read_segment( @@ -177,8 +133,8 @@ def test_read_block(self): """ Tests if signals are correctly stored in a block. """ - filename = self.get_local_path("nest/0gid-1time-2gex-1262-0.dat") - r = NestIO(filenames=filename) + filename = self.get_local_path('nest/0gid-1time-2gex-1262-0.dat') + r = NestIO(filenames=filename, target_object='AnalogSignal') id_list_to_test = range(1, 10) blk = r.read_block( @@ -204,23 +160,17 @@ def test_wrong_input(self): specifying a value_unit - User specifies t_start < 1.*sampling_period """ - filename = self.get_local_path("nest/0gid-1time-2gex-1262-0.dat") - r = NestIO(filenames=filename) + filename = self.get_local_path('nest/0gid-1time-2gex-1262-0.dat') + r = NestIO(filenames=filename, target_object='AnalogSignal') with self.assertRaises(ValueError): r.read_segment(t_stop=1000.0 * pq.ms, lazy=False, id_column_dat=0, time_column_dat=1) with self.assertRaises(ValueError): r.read_segment() with self.assertRaises(ValueError): - r.read_segment( - gid_list=[1], - t_stop=1000.0 * pq.ms, - sampling_period=1.0 * pq.ms, - lazy=False, - id_column_dat=0, - time_column_dat=1, - value_columns_dat=2, - value_types="V_m", - ) + r.read_segment(gid_list=[1], t_stop=1000. * pq.ms, + sampling_period=1., lazy=False, + id_column_dat=0, time_column_dat=1, + value_columns_dat=2, value_types='V_m') with self.assertRaises(ValueError): r.read_segment( @@ -238,8 +188,8 @@ def test_t_start_t_stop(self): """ Test for correct t_start and t_stop values of AnalogSignalArrays. """ - filename = self.get_local_path("nest/0gid-1time-2gex-1262-0.dat") - r = NestIO(filenames=filename) + filename = self.get_local_path('nest/0gid-1time-2gex-1262-0.dat') + r = NestIO(filenames=filename, target_object='AnalogSignal') t_start_targ = 450.0 * pq.ms t_stop_targ = 480.0 * pq.ms @@ -263,8 +213,8 @@ def test_notimeid(self): """ Test for warning, when no time column id was provided. """ - filename = self.get_local_path("nest/0gid-1time-2gex-1262-0.dat") - r = NestIO(filenames=filename) + filename = self.get_local_path('nest/0gid-1time-2gex-1262-0.dat') + r = NestIO(filenames=filename, target_object='AnalogSignal') t_start_targ = 450.0 * pq.ms t_stop_targ = 460.0 * pq.ms @@ -296,8 +246,8 @@ def test_multiple_value_columns(self): """ Test for simultaneous loading of multiple columns from dat file. """ - filename = self.get_local_path("nest/0gid-1time-2Vm-3Iex-4Iin-1264-0.dat") - r = NestIO(filenames=filename) + filename = self.get_local_path('nest/0gid-1time-2Vm-3Iex-4Iin-1264-0.dat') + r = NestIO(filenames=filename, target_object='AnalogSignal') sampling_period = pq.CompoundUnit("5*ms") seg = r.read_segment(gid_list=[1001], value_columns_dat=[2, 3], sampling_period=sampling_period) @@ -305,51 +255,34 @@ def test_multiple_value_columns(self): self.assertEqual(len(anasigs), 2) def test_single_gid(self): - filename = self.get_local_path("nest/N1-0gid-1time-2Vm-1265-0.dat") - r = NestIO(filenames=filename) - anasig = r.read_analogsignal( - gid=1, - t_stop=1000.0 * pq.ms, - time_unit=pq.CompoundUnit("0.1*ms"), - sampling_period=pq.ms, - lazy=False, - id_column=0, - time_column=1, - value_column=2, - value_type="V_m", - ) - assert anasig.annotations["id"] == 1 + filename = self.get_local_path('nest/N1-0gid-1time-2Vm-1265-0.dat') + r = NestIO(filenames=filename, target_object='AnalogSignal') + anasig = r.read_analogsignal(gid=1, t_stop=1000. * pq.ms, + time_unit=pq.CompoundUnit('0.1*ms'), + sampling_period=pq.ms, lazy=False, + id_column=0, time_column=1, + value_column=2, value_type='V_m') + assert anasig.annotations['id'] == 1 def test_no_gid(self): - filename = self.get_local_path("nest/N1-0time-1Vm-1266-0.dat") - r = NestIO(filenames=filename) - anasig = r.read_analogsignal( - gid=None, - t_stop=1000.0 * pq.ms, - time_unit=pq.CompoundUnit("0.1*ms"), - sampling_period=pq.ms, - lazy=False, - id_column=None, - time_column=0, - value_column=1, - value_type="V_m", - ) - self.assertEqual(anasig.annotations["id"], None) + filename = self.get_local_path('nest/N1-0time-1Vm-1266-0.dat') + r = NestIO(filenames=filename, target_object='AnalogSignal') + anasig = r.read_analogsignal(gid=None, t_stop=1000. * pq.ms, + time_unit=pq.CompoundUnit('0.1*ms'), + sampling_period=pq.ms, lazy=False, + id_column=None, time_column=0, + value_column=1, value_type='V_m') + self.assertEqual(anasig.annotations['id'], None) self.assertEqual(len(anasig), 19) def test_no_gid_no_time(self): - filename = self.get_local_path("nest/N1-0Vm-1267-0.dat") - r = NestIO(filenames=filename) - anasig = r.read_analogsignal( - gid=None, - sampling_period=pq.ms, - lazy=False, - id_column=None, - time_column=None, - value_column=0, - value_type="V_m", - ) - self.assertEqual(anasig.annotations["id"], None) + filename = self.get_local_path('nest/N1-0Vm-1267-0.dat') + r = NestIO(filenames=filename, target_object='AnalogSignal') + anasig = r.read_analogsignal(gid=None, + sampling_period=pq.ms, lazy=False, + id_column=None, time_column=None, + value_column=0, value_type='V_m') + self.assertEqual(anasig.annotations['id'], None) self.assertEqual(len(anasig), 19) @@ -479,13 +412,13 @@ def test_read_float(self): """ filename = self.get_local_path("nest/0gid-1time-1256-0.gdf") r = NestIO(filenames=filename) - st = r.read_spiketrain( - gdf_id=1, t_start=400.0 * pq.ms, t_stop=500.0 * pq.ms, lazy=False, id_column=0, time_column=1 - ) - self.assertTrue(st.magnitude.dtype == float) - seg = r.read_segment( - gid_list=[1], t_start=400.0 * pq.ms, t_stop=500.0 * pq.ms, lazy=False, id_column_gdf=0, time_column_gdf=1 - ) + st = r.read_spiketrain(gdf_id=1, t_start=400. * pq.ms, + t_stop=500. * pq.ms, + lazy=False, id_column=0, time_column=1) + self.assertTrue(st.magnitude.dtype == np.float_) + seg = r.read_segment(gid_list=[1], t_start=400. * pq.ms, + t_stop=500. * pq.ms, + lazy=False, id_column_gdf=0, time_column_gdf=1) sts = seg.spiketrains self.assertTrue(all([s.magnitude.dtype == np.float64 for s in sts])) @@ -711,32 +644,30 @@ def test_read_spiketrain_can_return_empty_spiketrain(self): self.assertEqual(st.size, 0) -class TestNestIO_multiple_signal_types(BaseTestIO, unittest.TestCase): - ioclass = NestIO - entities_to_test = [] - entities_to_download = ["nest"] - - def test_read_analogsignal_and_spiketrain(self): - """ - Test if spiketrains and analogsignals can be read simultaneously - using read_segment - """ - files = ["nest/0gid-1time-2gex-3Vm-1261-0.dat", "nest/0gid-1time_in_steps-1258-0.gdf"] - filenames = [self.get_local_path(e) for e in files] - - r = NestIO(filenames=filenames) - seg = r.read_segment( - gid_list=[], - t_start=400 * pq.ms, - t_stop=600 * pq.ms, - id_column_gdf=0, - time_column_gdf=1, - id_column_dat=0, - time_column_dat=1, - value_columns_dat=2, - ) - self.assertEqual(len(seg.spiketrains), 50) - self.assertEqual(len(seg.analogsignals), 50) +# class TestNestIO_multiple_signal_types(BaseTestIO, unittest.TestCase): +# ioclass = NestIO +# entities_to_test = [] +# entities_to_download = [ +# 'nest' +# ] +# +# def test_read_analogsignal_and_spiketrain(self): +# """ +# Test if spiketrains and analogsignals can be read simultaneously +# using read_segment +# """ +# files = ['nest/0gid-1time-2gex-3Vm-1261-0.dat', +# 'nest/0gid-1time_in_steps-1258-0.gdf'] +# filenames = [self.get_local_path(e) for e in files] +# +# r = NestIO(filenames=filenames) +# seg = r.read_segment(gid_list=[], t_start=400 * pq.ms, +# t_stop=600 * pq.ms, +# id_column_gdf=0, time_column_gdf=1, +# id_column_dat=0, time_column_dat=1, +# value_columns_dat=2) +# self.assertEqual(len(seg.spiketrains), 50) +# self.assertEqual(len(seg.analogsignals), 50) class TestColumnIO(BaseTestIO, unittest.TestCase):