Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- Runner: `body` property moved to `RunnerArguments` base class as all transports
should support arbitrary body data as part of a request.

- `CartesiaSTTService` now inherits from `WebsocketSTTService`.

- Package upgrades:
Expand All @@ -106,6 +109,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Fixed an issue where the `SmallWebRTCRequest` dataclass in runner would scrub
arbitrary request data from client due to camelCase typing. This fixes data
passthrough for JS clients where `APIRequest` is used.

- Fixed an issue in `RivaSegmentedSTTService` where a runtime error occurred due
to a mismatch in the _handle_transcription method's signature.

Expand Down
4 changes: 3 additions & 1 deletion src/pipecat/runner/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ async def offer(request: SmallWebRTCRequest, background_tasks: BackgroundTasks):
# Prepare runner arguments with the callback to run your bot
async def webrtc_connection_callback(connection):
bot_module = _get_bot_module()
runner_args = SmallWebRTCRunnerArguments(webrtc_connection=connection)
runner_args = SmallWebRTCRunnerArguments(
webrtc_connection=connection, body=request.request_data
)
background_tasks.add_task(bot_module.bot, runner_args)

# Delegate handling to SmallWebRTCRequestHandler
Expand Down
5 changes: 3 additions & 2 deletions src/pipecat/runner/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ class RunnerArguments:
handle_sigterm: bool = field(init=False)
pipeline_idle_timeout_secs: int = field(init=False)

body: Optional[Any] = field(default_factory=dict)

def __post_init__(self):
self.handle_sigint = False
self.handle_sigterm = False
self.pipeline_idle_timeout_secs = 300
self.body = self.body or {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to move the "body" here, since it seems it will be common to all transports and will allow receiving any custom data from the user.

And we will need to do the same inside the base image.

Tagging @mark for a second opinion on this, since he’s worked a lot more with the runners.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

➕ . Also, inconsistency in transport params for common properties like this can make building multi-transport bot files tricky. For me at least, this makes sense to have as a default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates an inheritance issue, resulting in a TypeError.

The new body parameter is initialized in the RunnerArguments while the the subclass has a non-default required parameter room_url. To solve this, we would need to either:

  • defer initializing body (e.g. init=False) (this is breaking, I think)
  • Make body kw_only=True (Potentially breaking if args provided positionally)
  • Initialize body in the SmallWebRTCRunnerArguments (not breaking)

Maybe this is the best outcome?

@dataclass
class RunnerArguments:
    handle_sigint: bool = field(init=False)
    handle_sigterm: bool = field(init=False)
    pipeline_idle_timeout_secs: int = field(init=False)
    body: dict[str, Any] = field(default_factory=dict, kw_only=True)

    def __post_init__(self):
        self.handle_sigint = False
        self.handle_sigterm = False
        self.pipeline_idle_timeout_secs = 300

This makes body a keyword-arg, but that eliminates ambiguity across transports. (It is semi-breaking though..)

Any changes made here need to be made to pipecatcloud types, as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The least breaking option is to just add to SmallWebRTCRunnerArguments, but you keep the transport inconsistency (for positional args only).

@jptaylor what do you mean by:

Also, inconsistency in transport params for common properties like this can make building multi-transport bot files tricky.

Is it if you're using positional args?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: @aconchillo just proposed this change, coincidentally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markbackman I can see how this is breaking. I will wait for @aconchillo to input.

Is it if you're using positional args?

Perhaps best to ignore input here as I'm clearly missing important context based on your comment (I didn't consider the positional args case, for example!) My point about multi-transport was that some bot methods can receive args that are either Daily or SmallWebRTC, so having a tight base class definition for request data seems like a good idea. If body was on RunnerArguments, for example, I would not have had to update the definition for SmallWebRTCRunnerArguments as part of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. In the case of multi-transport, you can just use the RunnerArguments type. This already works well and is employed in a number of our examples. Two different ways this is done:

You would only use the specific type if you have a single transport type.



@dataclass
Expand All @@ -42,7 +45,6 @@ class DailyRunnerArguments(RunnerArguments):

room_url: str
token: Optional[str] = None
body: Optional[Any] = field(default_factory=dict)


@dataclass
Expand All @@ -55,7 +57,6 @@ class WebSocketRunnerArguments(RunnerArguments):
"""

websocket: WebSocket
body: Optional[Any] = field(default_factory=dict)


@dataclass
Expand Down
32 changes: 16 additions & 16 deletions src/pipecat/services/aws/nova_sonic/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ async def _send_prompt_start_event(self, tools: List[Any]):
else ""
)

prompt_start = f'''
prompt_start = f"""
{{
"event": {{
"promptStart": {{
Expand All @@ -647,14 +647,14 @@ async def _send_prompt_start_event(self, tools: List[Any]):
}}
}}
}}
'''
"""
await self._send_client_event(prompt_start)

async def _send_audio_input_start_event(self):
if not self._prompt_name:
return

audio_content_start = f'''
audio_content_start = f"""
{{
"event": {{
"contentStart": {{
Expand All @@ -674,7 +674,7 @@ async def _send_audio_input_start_event(self):
}}
}}
}}
'''
"""
await self._send_client_event(audio_content_start)

async def _send_text_event(self, text: str, role: Role):
Expand All @@ -683,7 +683,7 @@ async def _send_text_event(self, text: str, role: Role):

content_name = str(uuid.uuid4())

text_content_start = f'''
text_content_start = f"""
{{
"event": {{
"contentStart": {{
Expand All @@ -698,11 +698,11 @@ async def _send_text_event(self, text: str, role: Role):
}}
}}
}}
'''
"""
await self._send_client_event(text_content_start)

escaped_text = json.dumps(text) # includes quotes
text_input = f'''
text_input = f"""
{{
"event": {{
"textInput": {{
Expand All @@ -712,10 +712,10 @@ async def _send_text_event(self, text: str, role: Role):
}}
}}
}}
'''
"""
await self._send_client_event(text_input)

text_content_end = f'''
text_content_end = f"""
{{
"event": {{
"contentEnd": {{
Expand All @@ -724,15 +724,15 @@ async def _send_text_event(self, text: str, role: Role):
}}
}}
}}
'''
"""
await self._send_client_event(text_content_end)

async def _send_user_audio_event(self, audio: bytes):
if not self._stream:
return

blob = base64.b64encode(audio)
audio_event = f'''
audio_event = f"""
{{
"event": {{
"audioInput": {{
Expand All @@ -742,22 +742,22 @@ async def _send_user_audio_event(self, audio: bytes):
}}
}}
}}
'''
"""
await self._send_client_event(audio_event)

async def _send_session_end_events(self):
if not self._stream or not self._prompt_name:
return

prompt_end = f'''
prompt_end = f"""
{{
"event": {{
"promptEnd": {{
"promptName": "{self._prompt_name}"
}}
}}
}}
'''
"""
await self._send_client_event(prompt_end)

session_end = """
Expand All @@ -775,7 +775,7 @@ async def _send_tool_result(self, tool_call_id, result):

content_name = str(uuid.uuid4())

result_content_start = f'''
result_content_start = f"""
{{
"event": {{
"contentStart": {{
Expand All @@ -794,7 +794,7 @@ async def _send_tool_result(self, tool_call_id, result):
}}
}}
}}
'''
"""
await self._send_client_event(result_content_start)

result_content = json.dumps(
Expand Down
7 changes: 7 additions & 0 deletions src/pipecat/transports/smallwebrtc/request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class SmallWebRTCRequest:
restart_pc: Optional[bool] = None
request_data: Optional[Any] = None

@classmethod
def from_dict(cls, data: dict):
"""Accept both snake_case and camelCase for the request_data field."""
if "requestData" in data and "request_data" not in data:
data["request_data"] = data.pop("requestData")
return cls(**data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for allowing both options?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main reason is not break our webSDK, which is currently sending it as "requestData".



@dataclass
class IceCandidate:
Expand Down