Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions backend/routers/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ async def receive_tasks():
asyncio.run_coroutine_threadsafe(realtime_transcript_webhook(uid, segments), loop)
continue

# Photos
if header_type == 104:
res = json.loads(bytes(data[4:]).decode("utf-8"))
photos = res.get('photos')
memory_id = res.get('memory_id')
asyncio.run_coroutine_threadsafe(trigger_realtime_integrations(uid, [], memory_id, photos), loop)
continue

# Audio bytes
if header_type == 101:
audiobuffer.extend(data[4:])
Expand Down
59 changes: 58 additions & 1 deletion backend/routers/transcribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from models.transcript_segment import Translation
from models.users import PlanType
from utils.analytics import record_usage
from utils.app_integrations import trigger_external_integrations
from utils.app_integrations import trigger_external_integrations, trigger_realtime_integrations
from utils.apps import is_audio_bytes_app_enabled
from utils.conversations.location import get_google_maps_location
from utils.conversations.process_conversation import process_conversation, retrieve_in_progress_conversation
Expand Down Expand Up @@ -622,6 +622,49 @@ def transcript_send(segments, conversation_id):
in_progress_conversation_id = conversation_id
segment_buffers.extend(segments)

# Photos
photo_buffers = []

def photo_send(photos, conversation_id):
nonlocal photo_buffers
nonlocal in_progress_conversation_id
in_progress_conversation_id = conversation_id
photo_buffers.extend([p.dict() for p in photos])

async def _photo_flush(auto_reconnect: bool = True):
nonlocal photo_buffers
nonlocal in_progress_conversation_id
nonlocal pusher_ws
nonlocal pusher_connected
if pusher_connected and pusher_ws and len(photo_buffers) > 0:
try:
# 104|data
data = bytearray()
data.extend(struct.pack("I", 104))
data.extend(
bytes(
json.dumps({"photos": photo_buffers, "memory_id": in_progress_conversation_id}),
"utf-8",
)
)
photo_buffers = [] # reset
await pusher_ws.send(data)
except ConnectionClosed as e:
print(f"Pusher photos Connection closed: {e}", uid, session_id)
pusher_connected = False
except Exception as e:
print(f"Pusher photos failed: {e}", uid, session_id)
if auto_reconnect and pusher_connected is False:
await connect()

async def photo_consume():
nonlocal websocket_active
nonlocal photo_buffers
while websocket_active:
await asyncio.sleep(1)
if len(photo_buffers) > 0:
await _photo_flush(auto_reconnect=True)

async def _transcript_flush(auto_reconnect: bool = True):
nonlocal segment_buffers
nonlocal in_progress_conversation_id
Expand Down Expand Up @@ -697,6 +740,7 @@ async def audio_bytes_consume():
async def _flush():
await _audio_bytes_flush(auto_reconnect=False)
await _transcript_flush(auto_reconnect=False)
await _photo_flush(auto_reconnect=False)

async def connect():
nonlocal pusher_connected
Expand Down Expand Up @@ -735,12 +779,16 @@ async def close(code: int = 1000):
close,
transcript_send,
transcript_consume,
photo_send,
photo_consume,
audio_bytes_send if audio_bytes_enabled else None,
audio_bytes_consume if audio_bytes_enabled else None,
)

transcript_send = None
transcript_consume = None
photo_send = None
photo_consume = None
audio_bytes_send = None
audio_bytes_consume = None
pusher_close = None
Expand Down Expand Up @@ -878,6 +926,11 @@ async def stream_transcript_process():
if transcript_send is not None and user_has_credits:
transcript_send([segment.dict() for segment in transcript_segments], current_conversation_id)

if photos_to_process:
if photo_send is not None and user_has_credits:
photo_send(photos_to_process, current_conversation_id)

if transcript_segments:
if translation_enabled:
await translate(conversation.transcript_segments[starts:ends], conversation.id)

Expand Down Expand Up @@ -1073,6 +1126,8 @@ async def receive_data(dg_socket1, dg_socket2, soniox_socket, soniox_socket2, sp
pusher_close,
transcript_send,
transcript_consume,
photo_send,
photo_consume,
audio_bytes_send,
audio_bytes_consume,
) = create_pusher_task_handler()
Expand All @@ -1081,6 +1136,8 @@ async def receive_data(dg_socket1, dg_socket2, soniox_socket, soniox_socket2, sp
pusher_tasks.append(asyncio.create_task(pusher_connect()))
if transcript_consume is not None:
pusher_tasks.append(asyncio.create_task(transcript_consume()))
if photo_consume is not None:
pusher_tasks.append(asyncio.create_task(photo_consume()))
if audio_bytes_consume is not None:
pusher_tasks.append(asyncio.create_task(audio_bytes_consume()))

Expand Down
13 changes: 9 additions & 4 deletions backend/utils/app_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ def _single(app: App):
return messages


async def trigger_realtime_integrations(uid: str, segments: list[dict], conversation_id: str | None):
async def trigger_realtime_integrations(uid: str, segments: list[dict], conversation_id: str | None, photos: List = None):
print("trigger_realtime_integrations", uid)
"""REALTIME STREAMING"""
# TODO: don't retrieve token before knowing if to notify
token = notification_db.get_token_only(uid)
_trigger_realtime_integrations(uid, token, segments, conversation_id)
_trigger_realtime_integrations(uid, token, segments, conversation_id, photos)


async def trigger_realtime_audio_bytes(uid: str, sample_rate: int, data: bytearray):
Expand Down Expand Up @@ -284,7 +284,7 @@ def _single(app: App):
return results


def _trigger_realtime_integrations(uid: str, token: str, segments: List[dict], conversation_id: str | None) -> dict:
def _trigger_realtime_integrations(uid: str, token: str, segments: List[dict], conversation_id: str | None, photos: List = None) -> dict:
apps: List[App] = get_available_apps(uid)
filtered_apps = [app for app in apps if app.triggers_realtime() and app.enabled]
if not filtered_apps:
Expand All @@ -304,7 +304,12 @@ def _single(app: App):
url += '?uid=' + uid

try:
response = requests.post(url, json={"session_id": uid, "segments": segments}, timeout=30)
# Build payload with photos if available
payload = {"session_id": uid, "segments": segments}
if photos:
payload["photos"] = [photo.dict() for photo in photos]

response = requests.post(url, json=payload, timeout=30)
if response.status_code != 200:
print(
'trigger_realtime_integrations',
Expand Down