Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
283 changes: 172 additions & 111 deletions callautomation-azure-openai-voice/azureOpenAIService.py
Original file line number Diff line number Diff line change
@@ -1,116 +1,177 @@
import json

import openai
from openai import AsyncAzureOpenAI
from openai.types.beta.realtime.session import Session
from openai.resources.beta.realtime.realtime import AsyncRealtimeConnection, AsyncRealtimeConnectionManager

import asyncio
import json
from rtclient import (
RTLowLevelClient,
SessionUpdateMessage,
ServerVAD,
SessionUpdateParams,
InputAudioBufferAppendMessage,
InputAudioTranscription,
)
import random

from azure.core.credentials import AzureKeyCredential
active_websocket = None
answer_prompt_system_template = "You are an AI assistant that helps people find information."
AZURE_OPENAI_SERVICE_ENDPOINT = "<AZURE_OPENAI_SERVICE_ENDPOINT>"
AZURE_OPENAI_SERVICE_KEY = "<AZURE_OPENAI_SERVICE_KEY>"
AZURE_OPENAI_DEPLOYMENT_MODEL_NAME = "<AZURE_OPENAI_DEPLOYMENT_MODEL_NAME>"

async def start_conversation():
global client
client = RTLowLevelClient(url=AZURE_OPENAI_SERVICE_ENDPOINT, key_credential=AzureKeyCredential(AZURE_OPENAI_SERVICE_KEY), azure_deployment=AZURE_OPENAI_DEPLOYMENT_MODEL_NAME)
await client.connect()
await client.send(
SessionUpdateMessage(
session=SessionUpdateParams(
instructions=answer_prompt_system_template,
turn_detection=ServerVAD(type="server_vad"),
voice= 'shimmer',
input_audio_format='pcm16',
output_audio_format='pcm16',
input_audio_transcription=InputAudioTranscription(model="whisper-1")
)

AZURE_OPENAI_API_ENDPOINT = '<AZURE_OPENAI_SERVICE_ENDPOINT>'
AZURE_OPENAI_API_VERSION = "2024-10-01-preview"
AZURE_OPENAI_API_KEY = '<AZURE_OPENAI_SERVICE_KEY>'
AZURE_OPENAI_DEPLOYMENT_NAME = '<AZURE_OPENAI_DEPLOYMENT_MODEL_NAME>'
SAMPLE_RATE = 24000

def session_config():
"""Returns a random value from the predefined list."""
values = ['alloy', 'ash', 'ballad', 'coral', 'echo', 'sage', 'shimmer', 'verse']
### for details on available param: https://platform.openai.com/docs/api-reference/realtime-sessions/create
SESSION_CONFIG={
"input_audio_transcription": {
"model": "whisper-1",
},
"turn_detection": {
"threshold": 0.4,
"silence_duration_ms": 600,
"type": "server_vad"
},
"instructions": "Your name is Sam, you work for Contoso Services. You're a helpful, calm and cheerful agent who responds with a clam British accent, but also can speak in any language or accent. Always start the conversation with a cheery hello, stating your name and who do you work for!",
"voice": random.choice(values),
"modalities": ["text", "audio"] ## required to solicit the initial welcome message
}
return SESSION_CONFIG

class OpenAIRTHandler():
incoming_websocket = None
client = None
connection = None
connection_manager = None
welcomed = False

def __init__(self) -> None:
print("Hello World")
self.client = AsyncAzureOpenAI(
azure_endpoint=AZURE_OPENAI_API_ENDPOINT,
azure_deployment=AZURE_OPENAI_DEPLOYMENT_NAME,
api_key=AZURE_OPENAI_API_KEY,
api_version=AZURE_OPENAI_API_VERSION,
)
self.connection_manager = self.client.beta.realtime.connect(
model="gpt-4o-realtime-preview" # Replace with your deployed realtime model id on Azure OpenAI.
)

def __exit__(self, exc_type, exc_value, traceback):
self.connection.close()
self.incoming_websocket.close()

#start_conversation > start_client
async def start_client(self):
self.connection = await self.connection_manager.enter()
await self.connection.session.update(session=session_config())
await self.connection.response.create()
### running an async task to listen and recieve oai messages
asyncio.create_task(self.receive_oai_messages())

#send_audio_to_external_ai > audio_to_oai
async def audio_to_oai(self, audioData: str):
await self.connection.input_audio_buffer.append(audio=audioData)

#receive_messages > receive_oai_messages
async def receive_oai_messages(self):
#while not self.connection._connection.close_code:
async for event in self.connection:
#print(event)
if event is None:
continue
match event.type:
case "session.created":
print("Session Created Message")
print(f" Session Id: {event.session.id}")
pass
case "error":
print(f" Error: {event.error}")
pass
case "input_audio_buffer.cleared":
print("Input Audio Buffer Cleared Message")
pass
case "input_audio_buffer.speech_started":
print(f"Voice activity detection started at {event.audio_start_ms} [ms]")
await self.stop_audio()
pass
case "input_audio_buffer.speech_stopped":
pass
case "conversation.item.input_audio_transcription.completed":
print(f" User:-- {event.transcript}")
case "conversation.item.input_audio_transcription.failed":
print(f" Error: {event.error}")
case "response.done":
print("Response Done Message")
print(f" Response Id: {event.response.id}")
if event.response.status_details:
print(f" Status Details: {event.response.status_details.model_dump_json()}")
case "response.audio_transcript.done":
print(f" AI:-- {event.transcript}")
case "response.audio.delta":
await self.oai_to_acs(event.delta)
pass
case _:
pass

#init_websocket -> init_incoming_websocket (incoming)
async def init_incoming_websocket(self, socket):
# print("--inbound socket set")
self.incoming_websocket = socket

#receive_audio_for_outbound > oai_to_acs
async def oai_to_acs(self, data):
try:
data = {
"Kind": "AudioData",
"AudioData": {
"Data": data
},
"StopAudio": None
}

# Serialize the server streaming data
serialized_data = json.dumps(data)
await self.send_message(serialized_data)

except Exception as e:
print(e)

# stop oai talking when detecting the user talking
async def stop_audio(self):
stop_audio_data = {
"Kind": "StopAudio",
"AudioData": None,
"StopAudio": {}
}

json_data = json.dumps(stop_audio_data)
await self.send_message(json_data)

# send_message > send_message
async def send_message(self, message: str):
try:
await self.incoming_websocket.send(message)
except Exception as e:
print(f"Failed to send message: {e}")

async def send_welcome(self):
if not self.welcomed:
await self.connection.conversation.item.create(
item={
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "Hi! What's your name and who do you work for?"}],
}
)
)

asyncio.create_task(receive_messages(client))

async def send_audio_to_external_ai(audioData: str):
await client.send(message=InputAudioBufferAppendMessage(type="input_audio_buffer.append", audio=audioData, _is_azure=True))

async def receive_messages(client: RTLowLevelClient):
while not client.closed:
message = await client.recv()
if message is None:
continue
match message.type:
case "session.created":
print("Session Created Message")
print(f" Session Id: {message.session.id}")
pass
case "error":
print(f" Error: {message.error}")
pass
case "input_audio_buffer.cleared":
print("Input Audio Buffer Cleared Message")
pass
case "input_audio_buffer.speech_started":
print(f"Voice activity detection started at {message.audio_start_ms} [ms]")
await stop_audio()
pass
case "input_audio_buffer.speech_stopped":
pass
case "conversation.item.input_audio_transcription.completed":
print(f" User:-- {message.transcript}")
case "conversation.item.input_audio_transcription.failed":
print(f" Error: {message.error}")
case "response.done":
print("Response Done Message")
print(f" Response Id: {message.response.id}")
if message.response.status_details:
print(f" Status Details: {message.response.status_details.model_dump_json()}")
case "response.audio_transcript.done":
print(f" AI:-- {message.transcript}")
case "response.audio.delta":
await receive_audio_for_outbound(message.delta)
pass
case _:
pass

async def init_websocket(socket):
global active_websocket
active_websocket = socket

async def receive_audio_for_outbound(data):
try:
data = {
"Kind": "AudioData",
"AudioData": {
"Data": data
},
"StopAudio": None
}

# Serialize the server streaming data
serialized_data = json.dumps(data)
await send_message(serialized_data)

except Exception as e:
print(e)

async def stop_audio():
stop_audio_data = {
"Kind": "StopAudio",
"AudioData": None,
"StopAudio": {}
}

json_data = json.dumps(stop_audio_data)
await send_message(json_data)

async def send_message(message: str):
global active_websocket
try:
await active_websocket.send(message)
except Exception as e:
print(f"Failed to send message: {e}")
await self.connection.response.create()
self.welcomed = True

#mediaStreamingHandler.process_websocket_message_async -> acs_to_oai
async def acs_to_oai(self, stream_data):
try:
data = json.loads(stream_data)
kind = data['kind']
if kind == "AudioData":
audio_data = data["audioData"]["data"]
await self.audio_to_oai(audio_data)
except Exception as e:
print(f'Error processing WebSocket message: {e}')
15 changes: 8 additions & 7 deletions callautomation-azure-openai-voice/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
import uuid
from azure.core.messaging import CloudEvent

from azureOpenAIService import init_websocket, start_conversation
from mediaStreamingHandler import process_websocket_message_async
from azureOpenAIService import OpenAIRTHandler
from threading import Thread

# Your ACS resource connection string
Expand Down Expand Up @@ -110,14 +109,16 @@ async def callbacks(contextId):
# WebSocket.
@app.websocket('/ws')
async def ws():
handler = OpenAIRTHandler()
print("Client connected to WebSocket")
await init_websocket(websocket)
await start_conversation()
while True:
await handler.init_incoming_websocket(websocket)
await handler.start_client()
while websocket:
try:
# Receive data from the client
data = await websocket.receive()
await process_websocket_message_async(data)
await handler.acs_to_oai(data)
await handler.send_welcome()
except Exception as e:
print(f"WebSocket connection closed: {e}")
break
Expand All @@ -128,7 +129,7 @@ def home():

if __name__ == '__main__':
app.logger.setLevel(INFO)
app.run(port=8080)
app.run(port=8000)



12 changes: 0 additions & 12 deletions callautomation-azure-openai-voice/mediaStreamingHandler.py

This file was deleted.

5 changes: 2 additions & 3 deletions callautomation-azure-openai-voice/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ This is a sample application demonstrated during Microsoft Ignite 2024. It highl
Create and activate python virtual environment and install required packages using following command
```
pip install -r requirements.txt
pip install -r ./aoai-whl/rtclient-0.5.1-py3-none-any.whl
```

### Setup and host your Azure DevTunnel
Expand All @@ -34,7 +33,7 @@ pip install -r ./aoai-whl/rtclient-0.5.1-py3-none-any.whl

```bash
devtunnel create --allow-anonymous
devtunnel port create -p 8080
devtunnel port create -p 8000
devtunnel host
```

Expand All @@ -54,7 +53,7 @@ Open `azureOpenAIService.py` file to configure the following settings
## Run app locally

1. Navigate to `callautomation-azure-openai-voice` folder and run `main.py` in debug mode or use command `python ./main.py` to run it from PowerShell, Command Prompt or Unix Terminal
2. Browser should pop up with the below page. If not navigate it to `http://localhost:8080/`or your dev tunnel url.
2. Browser should pop up with the below page. If not navigate it to `http://localhost:8000/`or your dev tunnel url.
3. Register an EventGrid Webhook for the IncomingCall(`https://<devtunnelurl>/api/incomingCall`) event that points to your devtunnel URI. Instructions [here](https://learn.microsoft.com/en-us/azure/communication-services/concepts/call-automation/incoming-call-notification).

Once that's completed you should have a running application. The best way to test this is to place a call to your ACS phone number and talk to your intelligent agent.
4 changes: 3 additions & 1 deletion callautomation-azure-openai-voice/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
Quart>=0.19.6
azure-eventgrid==4.11.0
aiohttp>= 3.11.9
azure-communication-callautomation==1.4.0b1
azure-communication-callautomation==1.4.0b1
openai
openai[realtime]