@@ -21,10 +21,9 @@ class TestIntanRawIO(
21
21
"intan/intan_fpc_rhs_test_240329_091637/info.rhs" , # Format one-file-per-channel
22
22
"intan/intan_fps_rhs_test_240329_091536/info.rhs" , # Format one-file-per-signal
23
23
"intan/rhd_fpc_multistim_240514_082044/info.rhd" , # Multiple digital channels one-file-per-channel rhd
24
- "intan/rhs_stim_data_single_file_format/intanTestFile.rhs" , # header-attached rhs data with stimulus current
25
- "intan/test_fcs_dc_250327_154333/info.rhs" , # this is an example of only having dc amp rather than amp files
26
- "intan/test_fpc_stim_250327_151617/info.rhs" , # wrong files names Heberto will fix naimgin in the future
27
-
24
+ "intan/rhs_stim_data_single_file_format/intanTestFile.rhs" , # header-attached rhs data with stimulus current
25
+ "intan/test_fcs_dc_250327_154333/info.rhs" , # this is an example of only having dc amp rather than amp files
26
+ "intan/test_fpc_stim_250327_151617/info.rhs" , # wrong files names Heberto will fix naimgin in the future
28
27
]
29
28
30
29
def test_annotations (self ):
@@ -88,7 +87,9 @@ def test_correct_reading_one_file_per_channel_amplifiers(self):
88
87
amplifier_stream_index = 0
89
88
for channel_name , amplifier_file_path in zip (channel_names , amplifier_file_paths ):
90
89
data_raw = np .fromfile (amplifier_file_path , dtype = np .int16 ).squeeze ()
91
- data_from_neo = intan_reader .get_analogsignal_chunk (channel_ids = [channel_name ], stream_index = amplifier_stream_index ).squeeze ()
90
+ data_from_neo = intan_reader .get_analogsignal_chunk (
91
+ channel_ids = [channel_name ], stream_index = amplifier_stream_index
92
+ ).squeeze ()
92
93
np .testing .assert_allclose (data_raw , data_from_neo )
93
94
94
95
def test_correct_reading_one_file_per_channel_rhs_stim (self ):
@@ -101,42 +102,45 @@ def test_correct_reading_one_file_per_channel_rhs_stim(self):
101
102
# This should be the folder where the files of all the channels are stored
102
103
folder_path = file_path .parent
103
104
104
- # The paths for the stim channels are stim-A-000.dat, stim-A-001.dat, stim-A-002.dat,
105
+ # The paths for the stim channels are stim-A-000.dat, stim-A-001.dat, stim-A-002.dat,
105
106
# Whereas the ids are A-001_STIM, A-002_STIM, A-003_STIM, etc
106
107
stim_file_paths = [path for path in folder_path .iterdir () if "stim" in path .name ]
107
108
channel_ids = [f"{ p .stem [5 :]} _STIM" for p in stim_file_paths ]
108
-
109
- stim_stream_index = 2
109
+
110
+ stim_stream_index = 2
110
111
for channel_id , amplifier_file_path in zip (channel_ids , stim_file_paths ):
111
112
data_raw = np .fromfile (amplifier_file_path , dtype = np .uint16 )
112
113
decoded_data = intan_reader ._decode_current_from_stim_data (data_raw , 0 , data_raw .shape [0 ])
113
- data_from_neo = intan_reader .get_analogsignal_chunk (channel_ids = [channel_id ], stream_index = stim_stream_index ).squeeze ()
114
+ data_from_neo = intan_reader .get_analogsignal_chunk (
115
+ channel_ids = [channel_id ], stream_index = stim_stream_index
116
+ ).squeeze ()
114
117
np .testing .assert_allclose (decoded_data , data_from_neo )
115
118
116
-
117
119
def test_correct_decoding_of_stimulus_current (self ):
118
120
# See https://github.com/NeuralEnsemble/python-neo/pull/1660 for discussion
119
- # See https://gin.g-node.org/NeuralEnsemble/ephy_testing_data/src/master/intan/README.md#rhs_stim_data_single_file_format
120
- # For a description of the data
121
-
121
+ # See https://gin.g-node.org/NeuralEnsemble/ephy_testing_data/src/master/intan/README.md#rhs_stim_data_single_file_format
122
+ # For a description of the data
123
+
122
124
file_path = Path (self .get_local_path ("intan/rhs_stim_data_single_file_format/intanTestFile.rhs" ))
123
125
intan_reader = IntanRawIO (filename = file_path )
124
126
intan_reader .parse_header ()
125
-
126
- signal_streams = intan_reader .header [' signal_streams' ]
127
- stream_ids = signal_streams ['id' ].tolist ()
128
- stream_index = stream_ids .index ('11' )
127
+
128
+ signal_streams = intan_reader .header [" signal_streams" ]
129
+ stream_ids = signal_streams ["id" ].tolist ()
130
+ stream_index = stream_ids .index ("11" )
129
131
sampling_rate = intan_reader .get_signal_sampling_rate (stream_index = stream_index )
130
132
sig_chunk = intan_reader .get_analogsignal_chunk (stream_index = stream_index , channel_ids = ["D-016_STIM" ])
131
- final_stim = intan_reader .rescale_signal_raw_to_float (sig_chunk , stream_index = stream_index , channel_ids = ["D-016_STIM" ])
133
+ final_stim = intan_reader .rescale_signal_raw_to_float (
134
+ sig_chunk , stream_index = stream_index , channel_ids = ["D-016_STIM" ]
135
+ )
132
136
133
137
# This contains only the first pulse and I got this by visual inspection
134
138
data_to_test = final_stim [200 :250 ]
135
-
139
+
136
140
positive_pulse_size = np .max (data_to_test ).item ()
137
141
negative_pulse_size = np .min (data_to_test ).item ()
138
142
139
- expected_value = 60 * 1e-6 # 60 microamperes
143
+ expected_value = 60 * 1e-6 # 60 microamperes
140
144
141
145
# Assert is close float
142
146
assert np .isclose (positive_pulse_size , expected_value )
@@ -146,15 +150,15 @@ def test_correct_decoding_of_stimulus_current(self):
146
150
argmin = np .argmin (data_to_test )
147
151
argmax = np .argmax (data_to_test )
148
152
assert argmin < argmax
149
-
153
+
150
154
# Check that the negative pulse is 200 us long
151
155
negative_pulse_frames = np .where (data_to_test > 0 )[0 ]
152
156
number_of_negative_frames = negative_pulse_frames .size
153
157
duration_of_negative_pulse = number_of_negative_frames / sampling_rate
154
158
155
- expected_duration = 200 * 1e-6 # 400 microseconds / 2
159
+ expected_duration = 200 * 1e-6 # 400 microseconds / 2
156
160
assert np .isclose (duration_of_negative_pulse , expected_duration )
157
-
161
+
158
162
# Check that the positive pulse is 200 us long
159
163
positive_pulse_frames = np .where (data_to_test > 0 )[0 ]
160
164
number_of_positive_frames = positive_pulse_frames .size
@@ -163,30 +167,31 @@ def test_correct_decoding_of_stimulus_current(self):
163
167
164
168
assert np .isclose (duration_of_positive_pulse , expected_duration )
165
169
166
-
167
170
def test_correct_decoding_of_stimulus_current (self ):
168
171
# See https://github.com/NeuralEnsemble/python-neo/pull/1660 for discussion
169
- # See https://gin.g-node.org/NeuralEnsemble/ephy_testing_data/src/master/intan/README.md#rhs_stim_data_single_file_format
170
- # For a description of the data
171
-
172
+ # See https://gin.g-node.org/NeuralEnsemble/ephy_testing_data/src/master/intan/README.md#rhs_stim_data_single_file_format
173
+ # For a description of the data
174
+
172
175
file_path = Path (self .get_local_path ("intan/rhs_stim_data_single_file_format/intanTestFile.rhs" ))
173
176
intan_reader = IntanRawIO (filename = file_path )
174
177
intan_reader .parse_header ()
175
-
176
- signal_streams = intan_reader .header [' signal_streams' ]
177
- stream_ids = signal_streams ['id' ].tolist ()
178
- stream_index = stream_ids .index ('11' )
178
+
179
+ signal_streams = intan_reader .header [" signal_streams" ]
180
+ stream_ids = signal_streams ["id" ].tolist ()
181
+ stream_index = stream_ids .index ("11" )
179
182
sampling_rate = intan_reader .get_signal_sampling_rate (stream_index = stream_index )
180
183
sig_chunk = intan_reader .get_analogsignal_chunk (stream_index = stream_index , channel_ids = ["D-016_STIM" ])
181
- final_stim = intan_reader .rescale_signal_raw_to_float (sig_chunk , stream_index = stream_index , channel_ids = ["D-016_STIM" ])
184
+ final_stim = intan_reader .rescale_signal_raw_to_float (
185
+ sig_chunk , stream_index = stream_index , channel_ids = ["D-016_STIM" ]
186
+ )
182
187
183
188
# This contains only the first pulse and I got this by visual inspection
184
189
data_to_test = final_stim [200 :250 ]
185
-
190
+
186
191
positive_pulse_size = np .max (data_to_test ).item ()
187
192
negative_pulse_size = np .min (data_to_test ).item ()
188
193
189
- expected_value = 60 * 1e-6 # 60 microamperes
194
+ expected_value = 60 * 1e-6 # 60 microamperes
190
195
191
196
# Assert is close float
192
197
assert np .isclose (positive_pulse_size , expected_value )
@@ -196,15 +201,15 @@ def test_correct_decoding_of_stimulus_current(self):
196
201
argmin = np .argmin (data_to_test )
197
202
argmax = np .argmax (data_to_test )
198
203
assert argmin < argmax
199
-
204
+
200
205
# Check that the negative pulse is 200 us long
201
206
negative_pulse_frames = np .where (data_to_test > 0 )[0 ]
202
207
number_of_negative_frames = negative_pulse_frames .size
203
208
duration_of_negative_pulse = number_of_negative_frames / sampling_rate
204
209
205
- expected_duration = 200 * 1e-6 # 400 microseconds / 2
210
+ expected_duration = 200 * 1e-6 # 400 microseconds / 2
206
211
assert np .isclose (duration_of_negative_pulse , expected_duration )
207
-
212
+
208
213
# Check that the positive pulse is 200 us long
209
214
positive_pulse_frames = np .where (data_to_test > 0 )[0 ]
210
215
number_of_positive_frames = positive_pulse_frames .size
0 commit comments