Skip to content

Commit d419f0f

Browse files
committed
sndio: change input/output_callbacks storage to a HashMap indexed by usize
1 parent 51d3486 commit d419f0f

File tree

2 files changed

+46
-87
lines changed

2 files changed

+46
-87
lines changed

src/host/sndio/mod.rs

Lines changed: 33 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -318,11 +318,11 @@ enum InnerState {
318318

319319
/// Each input Stream that has not been dropped has its callbacks in an element of this Vec.
320320
/// The last element is guaranteed to not be None.
321-
input_callbacks: Vec<Option<InputCallbacks>>,
321+
input_callbacks: (usize, HashMap<usize, InputCallbacks>),
322322

323323
/// Each output Stream that has not been dropped has its callbacks in an element of this Vec.
324324
/// The last element is guaranteed to not be None.
325-
output_callbacks: Vec<Option<OutputCallbacks>>,
325+
output_callbacks: (usize, HashMap<usize, OutputCallbacks>),
326326

327327
/// Whether the runner thread was spawned yet.
328328
thread_spawned: bool,
@@ -440,20 +440,16 @@ impl InnerState {
440440
ref mut wakeup_sender,
441441
..
442442
} => {
443-
for (i, cbs) in output_callbacks.iter_mut().enumerate() {
444-
if cbs.is_none() {
445-
*cbs = Some(callbacks);
446-
return Ok(i);
447-
}
448-
}
449443
// If there were previously no callbacks, wakeup the runner thread.
450-
if input_callbacks.len() == 0 && output_callbacks.len() == 0 {
444+
if input_callbacks.1.len() == 0 && output_callbacks.1.len() == 0 {
451445
if let Some(ref sender) = wakeup_sender {
452446
let _ = sender.send(());
453447
}
454448
}
455-
output_callbacks.push(Some(callbacks));
456-
Ok(output_callbacks.len() - 1)
449+
let index = output_callbacks.0;
450+
output_callbacks.1.insert(index, callbacks);
451+
output_callbacks.0 = index + 1;
452+
Ok(index)
457453
}
458454
_ => Err(backend_specific_error("device is not in a running state")),
459455
}
@@ -466,15 +462,7 @@ impl InnerState {
466462
InnerState::Running {
467463
ref mut output_callbacks,
468464
..
469-
} => {
470-
let cbs = output_callbacks[index].take().unwrap();
471-
while output_callbacks.len() > 0
472-
&& output_callbacks[output_callbacks.len() - 1].is_none()
473-
{
474-
output_callbacks.pop();
475-
}
476-
Ok(cbs)
477-
}
465+
} => Ok(output_callbacks.1.remove(&index).unwrap()),
478466
_ => Err(backend_specific_error("device is not in a running state")),
479467
}
480468
}
@@ -489,20 +477,16 @@ impl InnerState {
489477
ref mut wakeup_sender,
490478
..
491479
} => {
492-
for (i, cbs) in input_callbacks.iter_mut().enumerate() {
493-
if cbs.is_none() {
494-
*cbs = Some(callbacks);
495-
return Ok(i);
496-
}
497-
}
498480
// If there were previously no callbacks, wakeup the runner thread.
499-
if input_callbacks.len() == 0 && output_callbacks.len() == 0 {
481+
if input_callbacks.1.len() == 0 && output_callbacks.1.len() == 0 {
500482
if let Some(ref sender) = wakeup_sender {
501483
let _ = sender.send(());
502484
}
503485
}
504-
input_callbacks.push(Some(callbacks));
505-
Ok(input_callbacks.len() - 1)
486+
let index = input_callbacks.0;
487+
input_callbacks.1.insert(index, callbacks);
488+
input_callbacks.0 = index + 1;
489+
Ok(index)
506490
}
507491
_ => Err(backend_specific_error("device is not in a running state")),
508492
}
@@ -515,15 +499,7 @@ impl InnerState {
515499
InnerState::Running {
516500
ref mut input_callbacks,
517501
..
518-
} => {
519-
let cbs = input_callbacks[index].take().unwrap();
520-
while input_callbacks.len() > 0
521-
&& input_callbacks[input_callbacks.len() - 1].is_none()
522-
{
523-
input_callbacks.pop();
524-
}
525-
Ok(cbs)
526-
}
502+
} => Ok(input_callbacks.1.remove(&index).unwrap()),
527503
_ => Err(backend_specific_error("device is not in a running state")),
528504
}
529505
}
@@ -537,15 +513,11 @@ impl InnerState {
537513
..
538514
} => {
539515
let e = e.into();
540-
for cbs in input_callbacks {
541-
if let Some(cbs) = cbs {
542-
(cbs.error_callback)(e.clone());
543-
}
516+
for cbs in input_callbacks.1.values_mut() {
517+
(cbs.error_callback)(e.clone());
544518
}
545-
for cbs in output_callbacks {
546-
if let Some(cbs) = cbs {
547-
(cbs.error_callback)(e.clone());
548-
}
519+
for cbs in output_callbacks.1.values_mut() {
520+
(cbs.error_callback)(e.clone());
549521
}
550522
}
551523
_ => {} // Drop the error
@@ -596,8 +568,8 @@ impl InnerState {
596568
buffer_size,
597569
par,
598570
sample_rate_map: tmp,
599-
input_callbacks: vec![],
600-
output_callbacks: vec![],
571+
input_callbacks: (0, HashMap::new()),
572+
output_callbacks: (0, HashMap::new()),
601573
thread_spawned: false,
602574
wakeup_sender: None,
603575
};
@@ -618,6 +590,17 @@ impl InnerState {
618590
}
619591
}
620592
}
593+
594+
fn has_streams(&self) -> bool {
595+
match self {
596+
InnerState::Running {
597+
ref input_callbacks,
598+
ref output_callbacks,
599+
..
600+
} => input_callbacks.1.len() > 0 || output_callbacks.1.len() > 0,
601+
_ => false,
602+
}
603+
}
621604
}
622605

623606
impl Drop for InnerState {
@@ -1070,13 +1053,11 @@ impl Drop for Stream {
10701053

10711054
match *inner_state {
10721055
InnerState::Running {
1073-
ref input_callbacks,
1074-
ref output_callbacks,
10751056
ref thread_spawned,
10761057
ref wakeup_sender,
10771058
..
10781059
} => {
1079-
if input_callbacks.len() == 0 && output_callbacks.len() == 0 && *thread_spawned {
1060+
if !inner_state.has_streams() && *thread_spawned {
10801061
// Wake up runner thread so it can shut down
10811062
if let Some(ref sender) = wakeup_sender {
10821063
let _ = sender.send(());
@@ -1093,13 +1074,11 @@ impl Drop for Device {
10931074
let inner_state = self.inner_state.lock().unwrap();
10941075
match *inner_state {
10951076
InnerState::Running {
1096-
ref input_callbacks,
1097-
ref output_callbacks,
10981077
ref thread_spawned,
10991078
ref wakeup_sender,
11001079
..
11011080
} => {
1102-
if input_callbacks.len() == 0 && output_callbacks.len() == 0 && *thread_spawned {
1081+
if !inner_state.has_streams() && *thread_spawned {
11031082
// Wake up runner thread so it can shut down
11041083
if let Some(ref sender) = wakeup_sender {
11051084
let _ = sender.send(());

src/host/sndio/runner.rs

Lines changed: 13 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,6 @@ pub(super) fn runner(inner_state_arc: Arc<Mutex<InnerState>>) {
8181
{
8282
let mut inner_state = inner_state_arc.lock().unwrap();
8383
// If there's nothing to do, wait until that's no longer the case.
84-
let input_cbs;
85-
let output_cbs;
8684
match *inner_state {
8785
InnerState::Init { .. } | InnerState::Opened { .. } => {
8886
// Unlikely error state
@@ -91,17 +89,10 @@ pub(super) fn runner(inner_state_arc: Arc<Mutex<InnerState>>) {
9189
));
9290
break;
9391
}
94-
InnerState::Running {
95-
ref input_callbacks,
96-
ref output_callbacks,
97-
..
98-
} => {
99-
input_cbs = input_callbacks;
100-
output_cbs = output_callbacks;
101-
}
92+
_ => {}
10293
}
10394

104-
if input_cbs.len() == 0 && output_cbs.len() == 0 {
95+
if !inner_state.has_streams() {
10596
if !paused {
10697
if let Err(_) = inner_state.stop() {
10798
// No callbacks to error with
@@ -137,12 +128,8 @@ pub(super) fn runner(inner_state_arc: Arc<Mutex<InnerState>>) {
137128
));
138129
break;
139130
}
140-
InnerState::Running {
141-
ref input_callbacks,
142-
ref output_callbacks,
143-
..
144-
} => {
145-
if input_callbacks.len() == 0 && output_callbacks.len() == 0 {
131+
InnerState::Running { .. } => {
132+
if !inner_state.has_streams() {
146133
// Spurious wakeup
147134
continue;
148135
}
@@ -274,7 +261,7 @@ pub(super) fn runner(inner_state_arc: Arc<Mutex<InnerState>>) {
274261
} => {
275262
if output_offset_bytes_into_buf == 0 {
276263
// The whole output buffer has been written (or this is the first time). Fill it.
277-
if output_callbacks.len() == 0 {
264+
if output_callbacks.1.len() == 0 {
278265
if clear_output_buf_needed {
279266
// There is probably nonzero data in the buffer from previous output
280267
// Streams. Zero it out.
@@ -284,16 +271,11 @@ pub(super) fn runner(inner_state_arc: Arc<Mutex<InnerState>>) {
284271
clear_output_buf_needed = false;
285272
}
286273
} else {
287-
for opt_cbs in output_callbacks {
288-
if let Some(cbs) = opt_cbs {
289-
// Really we shouldn't have more than one output callback as they are
290-
// stepping on each others' data.
291-
// TODO: perhaps we should not call these callbacks while holding the lock
292-
(cbs.data_callback)(
293-
&mut output_data,
294-
&output_callback_info,
295-
);
296-
}
274+
for cbs in output_callbacks.1.values_mut() {
275+
// Really we shouldn't have more than one output callback as they are
276+
// stepping on each others' data.
277+
// TODO: perhaps we should not call these callbacks while holding the lock
278+
(cbs.data_callback)(&mut output_data, &output_callback_info);
297279
}
298280
clear_output_buf_needed = true;
299281
}
@@ -403,11 +385,9 @@ pub(super) fn runner(inner_state_arc: Arc<Mutex<InnerState>>) {
403385
ref mut input_callbacks,
404386
..
405387
} => {
406-
for opt_cbs in input_callbacks {
407-
if let Some(cbs) = opt_cbs {
408-
// TODO: perhaps we should not call these callbacks while holding the lock
409-
(cbs.data_callback)(&input_data, &input_callback_info);
410-
}
388+
for cbs in input_callbacks.1.values_mut() {
389+
// TODO: perhaps we should not call these callbacks while holding the lock
390+
(cbs.data_callback)(&input_data, &input_callback_info);
411391
}
412392
}
413393
}

0 commit comments

Comments
 (0)