Skip to content

Commit 83f44ad

Browse files
committed
wip
1 parent 0eb5579 commit 83f44ad

File tree

4 files changed

+121
-49
lines changed

4 files changed

+121
-49
lines changed

smelter-core/CLAUDE.md

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,30 @@ Pipeline orchestration: `Pipeline` in `./src/pipeline/instance.rs`. Shared state
3030
- RTP, RTMP client, MP4, HLS, WHIP, WHEP (WebRTC egress), RawData/EncodedData (channel-based)
3131
- Example: `RtpOutput` `./src/pipeline/rtp/rtp_output.rs`
3232

33-
- **Protocol support**`./src/pipeline/rtp/` (payloader/depayloader/jitter buffer), `./src/pipeline/webrtc/server.rs` (WHIP/WHEP HTTP endpoints), `./src/pipeline/utils/` (H.264 format conversions)
34-
35-
3633
## Multimedia data flow
3734

3835
Input (demuxer) → Decoder → Queue → Rendering/AudioMixer → Encoder → Output (muxer)
3936

4037
In most cases, each element spawns at least one thread and communicates with other elements via channels.
38+
39+
## Timestamps
40+
41+
- Inputs always start PTS from `queue_sync_point.elapsed()` when first packet arrives. If ahead of time processing is enabled it might lead to unexpected behaviors.
42+
- Input is responsible for maintaining continuity of the stream until it sends EOS event. After EOS, it is expected to synchronize with `queue_sync_point.elspased()` again.
43+
- Queue handles input offsets. If input does not have offset then timestamps from inputs are preserved. If input has an offset then queue calculates the value based on first packet (from audio or video track).
44+
- Queue handles pausing inputs. Pause offset is calculated based on difference between `queue_sync_point.elapsed()` on pause and resume calls.
45+
- When `ahead_of_time_processing` is enabled queue might be processing PTS value a lot larger than `queue_sync_point.elapsed()`
46+
47+
## Alternative approach
48+
49+
- Inputs always start from some epoch.
50+
- Queue synchronizes on the first packet
51+
- How audio and video sync should work?
52+
- Eos needs and generation number
53+
- On each track audio and video synchronizes for current epoch
54+
- Sending Reset event will reset timestamps
55+
- Pause
56+
- Seek
57+
- Send Reset
58+
- Start stream
59+

smelter-core/src/queue/audio_queue.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,18 +83,17 @@ impl AudioQueue {
8383
}
8484

8585
pub fn pause_input(&mut self, input_id: &InputId, pts: Duration) {
86-
if let Some(input) = self.inputs.get_mut(input_id) {
87-
if input.pause_state.pause(pts) {
88-
self.event_emitter
89-
.emit(Event::AudioInputStreamPaused(input_id.clone()));
90-
}
86+
if let Some(input) = self.inputs.get_mut(input_id)
87+
&& input.pause_state.pause(pts, &input.state)
88+
{
89+
self.event_emitter
90+
.emit(Event::AudioInputStreamPaused(input_id.clone()));
9191
}
9292
}
9393

9494
pub fn resume_input(&mut self, input_id: &InputId, pts: Duration) {
9595
if let Some(input) = self.inputs.get_mut(input_id) {
96-
let first_pts_received = input.shared_state.first_pts().is_some();
97-
if input.pause_state.resume(pts, first_pts_received) && first_pts_received {
96+
if input.pause_state.resume(pts, &input.state) {
9897
self.event_emitter
9998
.emit(Event::AudioInputStreamPlaying(input_id.clone()));
10099
}
@@ -281,7 +280,7 @@ impl AudioQueueInput {
281280
/// queue start.
282281
fn drop_old_samples_before_start(&mut self) {
283282
loop {
284-
if let QueueState::Draining = self.state {
283+
if let QueueState::Draining = self.state {
285284
self.reset_after_eos();
286285
}
287286
// if offset is defined try_enqueue_frame will always return err

smelter-core/src/queue/utils.rs

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ impl EmitOnceGuard {
3434
}
3535
}
3636

37+
#[derive(PartialEq, Eq, Clone, Copy)]
3738
pub enum QueueState {
3839
WaitingForFirstPacket,
3940
Running,
@@ -43,7 +44,6 @@ pub enum QueueState {
4344
}
4445

4546
pub struct PauseState {
46-
paused: bool,
4747
/// Internal PTS (relative to sync_point) when input was paused.
4848
paused_at_pts: Option<Duration>,
4949
/// Accumulated pause duration to add to frame/sample PTS after resume.
@@ -53,36 +53,48 @@ pub struct PauseState {
5353
impl PauseState {
5454
pub fn new() -> Self {
5555
Self {
56-
paused: false,
5756
paused_at_pts: None,
5857
pts_offset: Duration::ZERO,
5958
}
6059
}
6160

62-
/// Returns `true` if the state actually changed (was not already paused).
61+
/// Sets paused state. Returns `true` if the pause state was changed
6362
pub fn pause(&mut self, pts: Duration) -> bool {
64-
// Make pause idempotent: if we're already paused, do not update the
65-
// original pause start timestamp to avoid distorting pts_offset.
66-
if self.paused {
63+
if self.paused_at_pts.is_some() {
6764
return false;
6865
}
69-
self.paused = true;
7066
self.paused_at_pts = Some(pts);
7167
true
7268
}
7369

74-
/// Returns `true` if the state actually changed (was paused before).
75-
pub fn resume(&mut self, pts: Duration, first_pts_received: bool) -> bool {
76-
if !self.paused {
70+
/// Clears paused state. Returns `true` if pause state was changed
71+
pub fn resume(&mut self, pts: Duration, state: QueueState) -> bool {
72+
if self.paused_at_pts.is_none() {
7773
return false;
7874
}
79-
self.paused = false;
80-
if let Some(pause_start) = self.paused_at_pts.take()
81-
&& first_pts_received
82-
{
83-
self.pts_offset += pts.saturating_sub(pause_start);
75+
if let Some(pause_start) = self.paused_at_pts.take() {
76+
match state {
77+
QueueState::WaitingForFirstPacket => {
78+
// Input without offset
79+
// If input already resolved elapsed time before
80+
// - we need to add offset
81+
// If input did not resolve elapse time yet
82+
// -
83+
// If input resolved elapsed time during pause
84+
// - we need to know where
85+
//
86+
},
87+
QueueState::Running => {
88+
self.pts_offset += pts.saturating_sub(pause_start);
89+
}
90+
QueueState::Draining => {
91+
// We will clear the queue, send EOS on the next loop
92+
// and state will be reset, so offset does not matter
93+
// after this
94+
},
95+
}
8496
}
85-
true
97+
return true;
8698
}
8799

88100
pub fn is_paused(&self) -> bool {
@@ -92,4 +104,8 @@ impl PauseState {
92104
pub fn pts_offset(&self) -> Duration {
93105
self.pts_offset
94106
}
107+
108+
pub fn paused_at_pts(&self) -> Option<Duration> {
109+
self.paused_at_pts
110+
}
95111
}

smelter-core/src/queue/video_queue.rs

Lines changed: 60 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ impl VideoQueue {
5757

5858
offset_from_start: opts.offset,
5959

60-
pause_state: PauseState::new(),
61-
paused_last_frame: None,
60+
pause_state: VideoPauseState::new(),
6261

6362
state: QueueState::WaitingForFirstPacket,
6463

@@ -83,23 +82,22 @@ impl VideoQueue {
8382
}
8483

8584
pub fn pause_input(&mut self, input_id: &InputId, pts: Duration) {
86-
if let Some(input) = self.inputs.get_mut(input_id) {
87-
input.paused_last_frame = input.queue.front().cloned();
88-
if input.pause_state.pause(pts) {
89-
self.event_emitter
90-
.emit(Event::VideoInputStreamPaused(input_id.clone()));
91-
}
85+
if let Some(input) = self.inputs.get_mut(input_id)
86+
&& input
87+
.pause_state
88+
.pause(pts, &input.state, input.queue.front().cloned())
89+
{
90+
self.event_emitter
91+
.emit(Event::VideoInputStreamPaused(input_id.clone()));
9292
}
9393
}
9494

9595
pub fn resume_input(&mut self, input_id: &InputId, pts: Duration) {
9696
if let Some(input) = self.inputs.get_mut(input_id) {
97-
let first_pts_received = input.shared_state.first_pts().is_some();
98-
if input.pause_state.resume(pts, first_pts_received) && first_pts_received {
97+
if input.pause_state.resume(pts, &input.state) {
9998
self.event_emitter
10099
.emit(Event::VideoInputStreamPlaying(input_id.clone()));
101100
}
102-
input.paused_last_frame = None;
103101
input.queue.clear();
104102
}
105103
}
@@ -176,6 +174,52 @@ struct FrameEvent {
176174
event: PipelineEvent<Frame>,
177175
}
178176

177+
struct VideoPauseState {
178+
inner: PauseState,
179+
paused_frame: Option<Frame>,
180+
}
181+
182+
impl VideoPauseState {
183+
fn new() -> Self {
184+
Self {
185+
inner: PauseState::new(),
186+
paused_frame: None,
187+
}
188+
}
189+
190+
fn pause(&mut self, pts: Duration, state: &QueueState, frame: Option<Frame>) -> bool {
191+
let was_paused = self.inner.is_paused();
192+
let should_emit = self.inner.pause(pts, state);
193+
if !was_paused {
194+
self.paused_frame = frame;
195+
}
196+
should_emit
197+
}
198+
199+
fn resume(&mut self, pts: Duration, state: &QueueState) -> bool {
200+
self.paused_frame = None;
201+
self.inner.resume(pts, state)
202+
}
203+
204+
/// Returns the paused frame as a PipelineEvent with PTS shifted by time elapsed since pause.
205+
fn paused_frame(&self, buffer_pts: Duration) -> Option<PipelineEvent<Frame>> {
206+
self.paused_frame.clone().map(|mut frame| {
207+
if let Some(paused_at) = self.inner.paused_at_pts() {
208+
frame.pts += buffer_pts.saturating_sub(paused_at);
209+
}
210+
PipelineEvent::Data(frame)
211+
})
212+
}
213+
214+
fn is_paused(&self) -> bool {
215+
self.inner.is_paused()
216+
}
217+
218+
fn pts_offset(&self) -> Duration {
219+
self.inner.pts_offset()
220+
}
221+
}
222+
179223
pub struct VideoQueueInput {
180224
/// Frames are PTS ordered where PTS=0 represents beginning of the stream.
181225
queue: VecDeque<Frame>,
@@ -192,10 +236,7 @@ pub struct VideoQueueInput {
192236
sync_point: Instant,
193237
shared_state: SharedState,
194238

195-
pause_state: PauseState,
196-
/// Last frame captured at the moment of pause. Returned with updated PTS
197-
/// on every `get_frame` call while paused.
198-
paused_last_frame: Option<Frame>,
239+
pause_state: VideoPauseState,
199240

200241
state: QueueState,
201242

@@ -209,12 +250,9 @@ impl VideoQueueInput {
209250
/// whether stream is required or not.
210251
fn get_frame(&mut self, buffer_pts: Duration, queue_start_pts: Duration) -> Option<FrameEvent> {
211252
if self.pause_state.is_paused() {
212-
return self.paused_last_frame.clone().map(|mut frame| {
213-
frame.pts = buffer_pts;
214-
FrameEvent {
215-
required: self.required,
216-
event: PipelineEvent::Data(frame),
217-
}
253+
return self.pause_state.paused_frame(buffer_pts).map(|event| FrameEvent {
254+
required: self.required,
255+
event,
218256
});
219257
}
220258

@@ -345,7 +383,7 @@ impl VideoQueueInput {
345383
/// queue start.
346384
fn drop_old_frames_before_start(&mut self) {
347385
loop {
348-
if let QueueState::Draining = self.state {
386+
if let QueueState::Draining = self.state {
349387
self.reset_after_eos();
350388
}
351389
// if offset is defined try_enqueue_frame will always return err

0 commit comments

Comments
 (0)