Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rtmp/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn main() {

info!(?app, ?stream_key, "Received stream");
thread::spawn(move || {
while let Ok(media_data) = receiver.recv() {
for media_data in receiver {
match media_data {
RtmpEvent::H264Config(video_config) => {
info!(?video_config, "video config")
Expand Down
11 changes: 7 additions & 4 deletions rtmp/src/server/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
net::TcpStream,
sync::{Arc, Mutex, atomic::AtomicBool, mpsc::channel},
sync::{Arc, Mutex, atomic::AtomicBool},
};

use tracing::{debug, trace};
Expand All @@ -11,7 +11,10 @@ use crate::{
protocol::{
handshake::Handshake, message_reader::RtmpMessageReader, message_writer::RtmpMessageWriter,
},
server::{OnConnectionCallback, RtmpConnection, negotiation::negotiate_rtmp_session},
server::{
OnConnectionCallback, RtmpConnection, negotiation::negotiate_rtmp_session,
rtmp_event_channel,
},
};

pub(crate) fn handle_connection(
Expand All @@ -28,12 +31,12 @@ pub(crate) fn handle_connection(

debug!(?app, ?stream_key, "Negotiation complete");

let (sender, receiver) = channel();
let (sender, receiver) = rtmp_event_channel();

let connection_ctx = RtmpConnection {
app: app.into(),
stream_key: stream_key.into(),
receiver, // TODO instead of returning a receiver, return custom iterator that exposes buffer details
receiver,
};

{
Expand Down
159 changes: 159 additions & 0 deletions rtmp/src/server/event_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
use std::{
collections::VecDeque,
sync::{Arc, Condvar, Mutex},
};

use crate::RtmpEvent;

#[derive(Debug, Clone)]
pub struct RtmpEventBufferSnapshot {
pub len: usize,
pub first: Option<RtmpEvent>,
pub last: Option<RtmpEvent>,
}

#[derive(Debug)]
pub struct RtmpEventSendError {
pub event: RtmpEvent,
}

#[derive(Debug)]
pub struct RtmpEventSender {
shared: Arc<(Mutex<ChannelState>, Condvar)>,
}

#[derive(Debug)]
pub struct RtmpEventReceiver {
shared: Arc<(Mutex<ChannelState>, Condvar)>,
}

#[derive(Debug, Default)]
struct ChannelState {
queue: VecDeque<RtmpEvent>,
sender_count: usize,
receiver_alive: bool,
}

pub fn rtmp_event_channel() -> (RtmpEventSender, RtmpEventReceiver) {
let state = ChannelState {
sender_count: 1,
receiver_alive: true,
..Default::default()
};
let shared = Arc::new((Mutex::new(state), Condvar::new()));
(
RtmpEventSender {
shared: shared.clone(),
},
RtmpEventReceiver { shared },
)
}

impl RtmpEventSender {
pub fn send(&self, event: RtmpEvent) -> Result<(), RtmpEventSendError> {
let (lock, cvar) = &*self.shared;
let mut state = lock.lock().unwrap();

if !state.receiver_alive {
return Err(RtmpEventSendError { event });
}

state.queue.push_back(event);
cvar.notify_one();
Ok(())
}
}

impl Clone for RtmpEventSender {
fn clone(&self) -> Self {
let (lock, _) = &*self.shared;
let mut state = lock.lock().unwrap();
state.sender_count += 1;

Self {
shared: self.shared.clone(),
}
}
}

impl Drop for RtmpEventSender {
fn drop(&mut self) {
let (lock, cvar) = &*self.shared;
let mut state = lock.lock().unwrap();
state.sender_count = state.sender_count.saturating_sub(1);
cvar.notify_all();
}
}

impl RtmpEventReceiver {
pub fn recv(&self) -> Option<RtmpEvent> {
let (lock, cvar) = &*self.shared;
let mut state = lock.lock().unwrap();

loop {
if let Some(event) = state.queue.pop_front() {
return Some(event);
}

if state.sender_count == 0 {
return None;
}

state = cvar.wait(state).unwrap();
}
}

pub fn peek(&self) -> Option<RtmpEvent> {
let (lock, _) = &*self.shared;
let state = lock.lock().unwrap();
state.queue.front().cloned()
}

pub fn len(&self) -> usize {
let (lock, _) = &*self.shared;
let state = lock.lock().unwrap();
state.queue.len()
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn first(&self) -> Option<RtmpEvent> {
self.peek()
}

pub fn last(&self) -> Option<RtmpEvent> {
let (lock, _) = &*self.shared;
let state = lock.lock().unwrap();
state.queue.back().cloned()
}

pub fn buffer_snapshot(&self) -> RtmpEventBufferSnapshot {
let (lock, _) = &*self.shared;
let state = lock.lock().unwrap();
RtmpEventBufferSnapshot {
len: state.queue.len(),
first: state.queue.front().cloned(),
last: state.queue.back().cloned(),
}
}
}

impl Iterator for RtmpEventReceiver {
type Item = RtmpEvent;

fn next(&mut self) -> Option<Self::Item> {
self.recv()
}
}

impl Drop for RtmpEventReceiver {
fn drop(&mut self) {
let (lock, cvar) = &*self.shared;
let mut state = lock.lock().unwrap();
state.receiver_alive = false;
state.queue.clear();
cvar.notify_all();
}
}
11 changes: 8 additions & 3 deletions rtmp/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
mpsc::Receiver,
};

use crate::{RtmpEvent, error::RtmpError, server::listen_thread::start_listener_thread};
use crate::{error::RtmpError, server::listen_thread::start_listener_thread};

mod connection;
mod event_channel;
mod listen_thread;
mod negotiation;

pub use event_channel::{
RtmpEventBufferSnapshot, RtmpEventReceiver, RtmpEventSendError, RtmpEventSender,
rtmp_event_channel,
};

pub type OnConnectionCallback = Box<dyn FnMut(RtmpConnection) + Send + 'static>;

pub struct RtmpConnection {
pub app: Arc<str>,
pub stream_key: Arc<str>,
pub receiver: Receiver<RtmpEvent>,
pub receiver: RtmpEventReceiver,
}

// TODO add SSL/TLS
Expand Down
14 changes: 6 additions & 8 deletions smelter-core/src/pipeline/rtmp/rtmp_input/connection.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::{
sync::{Arc, mpsc},
thread::JoinHandle,
time::Duration,
};
use std::{sync::Arc, thread::JoinHandle, time::Duration};

use crossbeam_channel::Sender;
use rtmp::{AacAudioConfig, AacAudioData, H264VideoConfig, H264VideoData, RtmpEvent};
use rtmp::{
AacAudioConfig, AacAudioData, H264VideoConfig, H264VideoData, RtmpEvent, RtmpEventReceiver,
};
use smelter_render::{Frame, InputId, error::ErrorStack};
use tracing::{Level, error, info, span, warn};

Expand Down Expand Up @@ -231,7 +229,7 @@ impl RtmpConnectionState {
pub(crate) fn start_connection_thread(
ctx: Arc<PipelineCtx>,
input_ref: Ref<InputId>,
receiver: mpsc::Receiver<RtmpEvent>,
receiver: RtmpEventReceiver,
options: RtmpConnectionOptions,
) -> JoinHandle<()> {
std::thread::Builder::new()
Expand All @@ -248,7 +246,7 @@ pub(crate) fn start_connection_thread(
let mut state = RtmpConnectionState::new(ctx, input_ref, options);
info!("RTMP stream connection opened");

while let Ok(rtmp_event) = receiver.recv() {
for rtmp_event in receiver {
state.handle_rtmp_event(rtmp_event);
}

Expand Down