diff --git a/backend/routers/pusher.py b/backend/routers/pusher.py index f86a2c3e42..ee46d07a2f 100644 --- a/backend/routers/pusher.py +++ b/backend/routers/pusher.py @@ -63,6 +63,14 @@ async def receive_tasks(): asyncio.run_coroutine_threadsafe(realtime_transcript_webhook(uid, segments), loop) continue + # Photos + if header_type == 104: + res = json.loads(bytes(data[4:]).decode("utf-8")) + photos = res.get('photos') + memory_id = res.get('memory_id') + asyncio.run_coroutine_threadsafe(trigger_realtime_integrations(uid, [], memory_id, photos), loop) + continue + # Audio bytes if header_type == 101: audiobuffer.extend(data[4:]) diff --git a/backend/routers/transcribe.py b/backend/routers/transcribe.py index 69f442ed5b..2751d0dd5a 100644 --- a/backend/routers/transcribe.py +++ b/backend/routers/transcribe.py @@ -42,7 +42,7 @@ from models.transcript_segment import Translation from models.users import PlanType from utils.analytics import record_usage -from utils.app_integrations import trigger_external_integrations +from utils.app_integrations import trigger_external_integrations, trigger_realtime_integrations from utils.apps import is_audio_bytes_app_enabled from utils.conversations.location import get_google_maps_location from utils.conversations.process_conversation import process_conversation, retrieve_in_progress_conversation @@ -622,6 +622,49 @@ def transcript_send(segments, conversation_id): in_progress_conversation_id = conversation_id segment_buffers.extend(segments) + # Photos + photo_buffers = [] + + def photo_send(photos, conversation_id): + nonlocal photo_buffers + nonlocal in_progress_conversation_id + in_progress_conversation_id = conversation_id + photo_buffers.extend([p.dict() for p in photos]) + + async def _photo_flush(auto_reconnect: bool = True): + nonlocal photo_buffers + nonlocal in_progress_conversation_id + nonlocal pusher_ws + nonlocal pusher_connected + if pusher_connected and pusher_ws and len(photo_buffers) > 0: + try: + # 104|data + data = bytearray() + data.extend(struct.pack("I", 104)) + data.extend( + bytes( + json.dumps({"photos": photo_buffers, "memory_id": in_progress_conversation_id}), + "utf-8", + ) + ) + photo_buffers = [] # reset + await pusher_ws.send(data) + except ConnectionClosed as e: + print(f"Pusher photos Connection closed: {e}", uid, session_id) + pusher_connected = False + except Exception as e: + print(f"Pusher photos failed: {e}", uid, session_id) + if auto_reconnect and pusher_connected is False: + await connect() + + async def photo_consume(): + nonlocal websocket_active + nonlocal photo_buffers + while websocket_active: + await asyncio.sleep(1) + if len(photo_buffers) > 0: + await _photo_flush(auto_reconnect=True) + async def _transcript_flush(auto_reconnect: bool = True): nonlocal segment_buffers nonlocal in_progress_conversation_id @@ -697,6 +740,7 @@ async def audio_bytes_consume(): async def _flush(): await _audio_bytes_flush(auto_reconnect=False) await _transcript_flush(auto_reconnect=False) + await _photo_flush(auto_reconnect=False) async def connect(): nonlocal pusher_connected @@ -735,12 +779,16 @@ async def close(code: int = 1000): close, transcript_send, transcript_consume, + photo_send, + photo_consume, audio_bytes_send if audio_bytes_enabled else None, audio_bytes_consume if audio_bytes_enabled else None, ) transcript_send = None transcript_consume = None + photo_send = None + photo_consume = None audio_bytes_send = None audio_bytes_consume = None pusher_close = None @@ -878,6 +926,11 @@ async def stream_transcript_process(): if transcript_send is not None and user_has_credits: transcript_send([segment.dict() for segment in transcript_segments], current_conversation_id) + if photos_to_process: + if photo_send is not None and user_has_credits: + photo_send(photos_to_process, current_conversation_id) + + if transcript_segments: if translation_enabled: await translate(conversation.transcript_segments[starts:ends], conversation.id) @@ -1073,6 +1126,8 @@ async def receive_data(dg_socket1, dg_socket2, soniox_socket, soniox_socket2, sp pusher_close, transcript_send, transcript_consume, + photo_send, + photo_consume, audio_bytes_send, audio_bytes_consume, ) = create_pusher_task_handler() @@ -1081,6 +1136,8 @@ async def receive_data(dg_socket1, dg_socket2, soniox_socket, soniox_socket2, sp pusher_tasks.append(asyncio.create_task(pusher_connect())) if transcript_consume is not None: pusher_tasks.append(asyncio.create_task(transcript_consume())) + if photo_consume is not None: + pusher_tasks.append(asyncio.create_task(photo_consume())) if audio_bytes_consume is not None: pusher_tasks.append(asyncio.create_task(audio_bytes_consume())) diff --git a/backend/utils/app_integrations.py b/backend/utils/app_integrations.py index 142a6aa7e2..3fcce6411c 100644 --- a/backend/utils/app_integrations.py +++ b/backend/utils/app_integrations.py @@ -144,12 +144,12 @@ def _single(app: App): return messages -async def trigger_realtime_integrations(uid: str, segments: list[dict], conversation_id: str | None): +async def trigger_realtime_integrations(uid: str, segments: list[dict], conversation_id: str | None, photos: List = None): print("trigger_realtime_integrations", uid) """REALTIME STREAMING""" # TODO: don't retrieve token before knowing if to notify token = notification_db.get_token_only(uid) - _trigger_realtime_integrations(uid, token, segments, conversation_id) + _trigger_realtime_integrations(uid, token, segments, conversation_id, photos) async def trigger_realtime_audio_bytes(uid: str, sample_rate: int, data: bytearray): @@ -284,7 +284,7 @@ def _single(app: App): return results -def _trigger_realtime_integrations(uid: str, token: str, segments: List[dict], conversation_id: str | None) -> dict: +def _trigger_realtime_integrations(uid: str, token: str, segments: List[dict], conversation_id: str | None, photos: List = None) -> dict: apps: List[App] = get_available_apps(uid) filtered_apps = [app for app in apps if app.triggers_realtime() and app.enabled] if not filtered_apps: @@ -304,7 +304,12 @@ def _single(app: App): url += '?uid=' + uid try: - response = requests.post(url, json={"session_id": uid, "segments": segments}, timeout=30) + # Build payload with photos if available + payload = {"session_id": uid, "segments": segments} + if photos: + payload["photos"] = [photo.dict() for photo in photos] + + response = requests.post(url, json=payload, timeout=30) if response.status_code != 200: print( 'trigger_realtime_integrations',