diff --git a/OpenAI-DotNet/Extensions/RealtimeServerEventConverter.cs b/OpenAI-DotNet/Extensions/RealtimeServerEventConverter.cs index 2455ff85..751b3430 100644 --- a/OpenAI-DotNet/Extensions/RealtimeServerEventConverter.cs +++ b/OpenAI-DotNet/Extensions/RealtimeServerEventConverter.cs @@ -29,7 +29,8 @@ _ when type.StartsWith("conversation.item.input_audio_transcription") => root.De "input_audio_buffer.committed" => root.Deserialize(options), "input_audio_buffer.cleared" => root.Deserialize(options), "input_audio_buffer.speech_started" => root.Deserialize(options), - "input_audio_buffer.speech_stopped" => root.Deserialize(options), + "input_audio_buffer.speech_stopped" => root.Deserialize(options), + "output_audio_buffer.started" => root.Deserialize(options), _ when type.StartsWith("response.audio_transcript") => root.Deserialize(options), _ when type.StartsWith("response.audio") => root.Deserialize(), _ when type.StartsWith("response.content_part") => root.Deserialize(options), diff --git a/OpenAI-DotNet/OpenAI-DotNet.csproj b/OpenAI-DotNet/OpenAI-DotNet.csproj index 92ce465d..8c962590 100644 --- a/OpenAI-DotNet/OpenAI-DotNet.csproj +++ b/OpenAI-DotNet/OpenAI-DotNet.csproj @@ -458,5 +458,6 @@ Version 4.4.0 all runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/OpenAI-DotNet/OpenAIClient.cs b/OpenAI-DotNet/OpenAIClient.cs index a459624f..68bb5b29 100644 --- a/OpenAI-DotNet/OpenAIClient.cs +++ b/OpenAI-DotNet/OpenAIClient.cs @@ -75,7 +75,8 @@ public OpenAIClient(OpenAIAuthentication openAIAuthentication = null, OpenAIClie AssistantsEndpoint = new AssistantsEndpoint(this); BatchEndpoint = new BatchEndpoint(this); VectorStoresEndpoint = new VectorStoresEndpoint(this); - RealtimeEndpoint = new RealtimeEndpoint(this); + RealtimeEndpoint = new RealtimeEndpoint(this); + RealtimeEndpointWebRTC = new RealtimeEndpointWebRTC(this); } ~OpenAIClient() => Dispose(false); @@ -219,7 +220,9 @@ private void Dispose(bool disposing) /// public VectorStoresEndpoint VectorStoresEndpoint { get; } - public RealtimeEndpoint RealtimeEndpoint { get; } + public RealtimeEndpoint RealtimeEndpoint { get; } + + public RealtimeEndpointWebRTC RealtimeEndpointWebRTC { get; } #endregion Endpoints diff --git a/OpenAI-DotNet/Realtime/OutputAudioBufferStartedResponse.cs b/OpenAI-DotNet/Realtime/OutputAudioBufferStartedResponse.cs new file mode 100644 index 00000000..63bdb735 --- /dev/null +++ b/OpenAI-DotNet/Realtime/OutputAudioBufferStartedResponse.cs @@ -0,0 +1,33 @@ +// Licensed under the MIT License. See LICENSE in the project root for license information. + +using System.Text.Json.Serialization; + +namespace OpenAI.Realtime +{ + public sealed class OutputAudioBufferStartedResponse : BaseRealtimeEvent, IServerEvent + { + /// + [JsonInclude] + [JsonPropertyName("event_id")] + public override string EventId { get; internal set; } + + /// + [JsonInclude] + [JsonPropertyName("type")] + public override string Type { get; protected set; } + + /// + /// Milliseconds since the session started when speech was detected. + /// + [JsonInclude] + [JsonPropertyName("audio_start_ms")] + public int AudioStartMs { get; private set; } + + /// + /// The ID of the user message item that will be created when speech stops. + /// + [JsonInclude] + [JsonPropertyName("item_id")] + public string ItemId { get; private set; } + } +} diff --git a/OpenAI-DotNet/Realtime/RealtimeEndpointWebRTC.cs b/OpenAI-DotNet/Realtime/RealtimeEndpointWebRTC.cs new file mode 100644 index 00000000..b38678a1 --- /dev/null +++ b/OpenAI-DotNet/Realtime/RealtimeEndpointWebRTC.cs @@ -0,0 +1,200 @@ +// Licensed under the MIT License. See LICENSE in the project root for license information. + +using OpenAI.Extensions; +using SIPSorcery.Media; +using SIPSorcery.Net; +using SIPSorceryMedia.Abstractions; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace OpenAI.Realtime +{ + public sealed class RealtimeEndpointWebRTC : OpenAIBaseEndpoint + { + private const string OPENAI_DATACHANNEL_NAME = "oai-events"; + + public readonly AudioEncoder AudioEncoder; + + public readonly AudioFormat AudioFormat; + + internal RealtimeEndpointWebRTC(OpenAIClient client) : base(client) { + AudioEncoder = new AudioEncoder(includeOpus: true); + AudioFormat = AudioEncoder.SupportedFormats.Single(x => x.FormatName == AudioCodecsEnum.OPUS.ToString()); + } + + protected override string Root => "realtime"; + + protected override bool? IsWebSocketEndpoint => false; + + private RTCPeerConnection rtcPeerConnection; + + public event Action OnRtpPacketReceived; + + public event Action OnPeerConnectionConnected; + + public event Action OnPeerConnectionClosedOrFailed; + + /// + /// Creates a new realtime session with the provided options. + /// + /// . + /// Optional, . + /// . + public async Task CreateSessionAsync(SessionConfiguration configuration = null, RTCConfiguration rtcConfiguration = null, CancellationToken cancellationToken = default) + { + rtcPeerConnection = await CreatePeerConnection(rtcConfiguration); + var session = new RealtimeSessionWebRTC(rtcPeerConnection, EnableDebug); + var sessionCreatedTcs = new TaskCompletionSource(); + + try + { + session.OnEventReceived += OnEventReceived; + session.OnError += OnError; + var offerSdp = rtcPeerConnection.createOffer(); + var answerSdp = await SendSdpAsync(configuration?.Model, offerSdp.sdp); + var setAnswerResult = rtcPeerConnection.setRemoteDescription( + new RTCSessionDescriptionInit { sdp = answerSdp, type = RTCSdpType.answer } + ); + + if (setAnswerResult != SetDescriptionResultEnum.OK) + { + sessionCreatedTcs.TrySetException(new Exception("WebRTC SDP negotiation failed")); + } + + var sessionResponse = await sessionCreatedTcs.Task.WithCancellation(cancellationToken).ConfigureAwait(false); + session.Configuration = sessionResponse.SessionConfiguration; + await session.SendAsync(new UpdateSessionRequest(configuration), cancellationToken: cancellationToken).ConfigureAwait(false); + } + finally + { + session.OnError -= OnError; + session.OnEventReceived -= OnEventReceived; + } + + return session; + + void OnError(Exception e) + { + sessionCreatedTcs.SetException(e); + } + + void OnEventReceived(IRealtimeEvent @event) + { + try + { + switch (@event) + { + case RealtimeConversationResponse: + Console.WriteLine("[conversation.created]"); + break; + case SessionResponse sessionResponse: + if (sessionResponse.Type == "session.created") + { + sessionCreatedTcs.TrySetResult(sessionResponse); + } + break; + case RealtimeEventError realtimeEventError: + sessionCreatedTcs.TrySetException(new Exception(realtimeEventError.Error.Message)); + break; + } + } + catch (Exception e) + { + Console.WriteLine(e); + sessionCreatedTcs.TrySetException(e); + } + } + } + + private async Task CreatePeerConnection(RTCConfiguration pcConfig) + { + var peerConnection = new RTCPeerConnection(pcConfig); + MediaStreamTrack audioTrack = new MediaStreamTrack(AudioFormat, MediaStreamStatusEnum.SendRecv); + peerConnection.addTrack(audioTrack); + + var dataChannel = await peerConnection.createDataChannel(OPENAI_DATACHANNEL_NAME); + + if (EnableDebug) + { + peerConnection.onconnectionstatechange += state => Console.WriteLine($"Peer connection connected changed to {state}."); + peerConnection.OnTimeout += mediaType => Console.WriteLine($"Timeout on media {mediaType}."); + peerConnection.oniceconnectionstatechange += state => Console.WriteLine($"ICE connection state changed to {state}."); + + peerConnection.onsignalingstatechange += () => + { + if (peerConnection.signalingState == RTCSignalingState.have_local_offer) + { + Console.WriteLine($"Local SDP:\n{peerConnection.localDescription.sdp}"); + } + else if (peerConnection.signalingState is RTCSignalingState.have_remote_offer or RTCSignalingState.stable) + { + Console.WriteLine($"Remote SDP:\n{peerConnection.remoteDescription?.sdp}"); + } + }; + } + + peerConnection.OnRtpPacketReceived += (ep, mt, rtp) => OnRtpPacketReceived?.Invoke(ep, mt, rtp); + + peerConnection.onconnectionstatechange += (state) => + { + if (state is RTCPeerConnectionState.closed or + RTCPeerConnectionState.failed or + RTCPeerConnectionState.disconnected) + { + OnPeerConnectionClosedOrFailed?.Invoke(); + } + }; + + dataChannel.onopen += () => OnPeerConnectionConnected?.Invoke(); + + dataChannel.onclose += () => OnPeerConnectionClosedOrFailed?.Invoke(); + + return peerConnection; + } + + public void SendAudio(uint durationRtpUnits, byte[] sample) + { + if(rtcPeerConnection != null && rtcPeerConnection.connectionState == RTCPeerConnectionState.connected) + { + rtcPeerConnection.SendAudio(durationRtpUnits, sample); + } + } + + public async Task SendSdpAsync(string model, string offerSdp, CancellationToken cancellationToken = default) + { + model = string.IsNullOrWhiteSpace(model) ? Models.Model.GPT4oRealtime : model; + var queryParameters = new Dictionary(); + + if (client.OpenAIClientSettings.IsAzureOpenAI) + { + queryParameters["deployment"] = model; + } + else + { + queryParameters["model"] = model; + } + + var content = new StringContent(offerSdp, Encoding.UTF8); + content.Headers.ContentType = new MediaTypeHeaderValue("application/sdp"); + + var url = GetUrl(queryParameters: queryParameters); + using var response = await client.Client.PostAsync(GetUrl(queryParameters: queryParameters), content, cancellationToken).ConfigureAwait(false); + + if(!response.IsSuccessStatusCode) + { + var errorBody = await response.Content.ReadAsStringAsync(); + throw new Exception($"Error sending SDP offer {errorBody}"); + } + + var sdpAnswer = await response.ReadAsStringAsync(EnableDebug, content, cancellationToken).ConfigureAwait(false); + return sdpAnswer; + } + } +} diff --git a/OpenAI-DotNet/Realtime/RealtimeSessionWebRTC.cs b/OpenAI-DotNet/Realtime/RealtimeSessionWebRTC.cs new file mode 100644 index 00000000..61e6016c --- /dev/null +++ b/OpenAI-DotNet/Realtime/RealtimeSessionWebRTC.cs @@ -0,0 +1,277 @@ +// Licensed under the MIT License. See LICENSE in the project root for license information. + +using OpenAI.Extensions; +using System; +using System.Collections.Concurrent; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using SIPSorcery.Net; +using System.Linq; +using System.Text; + +namespace OpenAI.Realtime +{ + public sealed class RealtimeSessionWebRTC : IDisposable + { + /// + /// Enable or disable logging. + /// + public bool EnableDebug { get; set; } + + /// + /// The timeout in seconds to wait for a response from the server. + /// + public int EventTimeout { get; set; } = 30; + + /// + /// The configuration options for the session. + /// + public SessionConfiguration Configuration { get; internal set; } + + #region Internal + + internal event Action OnEventReceived; + + internal event Action OnError; + + private readonly RTCPeerConnection peerConnection; + private readonly ConcurrentQueue events = new(); + private readonly object eventLock = new(); + + internal RealtimeSessionWebRTC(RTCPeerConnection pc, bool enableDebug) + { + peerConnection = pc; + EnableDebug = enableDebug; + + SetPeerConnectionEventHandlers(pc); + } + + private void SetPeerConnectionEventHandlers(RTCPeerConnection pc) + { + var dataChannel = pc.DataChannels.FirstOrDefault(); + + if (dataChannel != null) + { + dataChannel.onmessage += OnMessage; + } + } + + private void OnMessage(RTCDataChannel dc, DataChannelPayloadProtocols protocol, byte[] data) + { + var rawMessage = Encoding.UTF8.GetString(data); + + if (EnableDebug) + { + Console.WriteLine(rawMessage); + } + + try + { + var @event = JsonSerializer.Deserialize(rawMessage, OpenAIClient.JsonSerializationOptions); + + lock (eventLock) + { + events.Enqueue(@event); + } + + OnEventReceived?.Invoke(@event); + } + catch (Exception e) + { + Console.WriteLine(e); + OnError?.Invoke(e); + } + } + + ~RealtimeSessionWebRTC() => Dispose(false); + + #region IDisposable + + private bool isDisposed; + + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + private void Dispose(bool disposing) + { + if (!isDisposed && disposing) + { + var dataChannel = peerConnection?.DataChannels.FirstOrDefault(); + + if (dataChannel != null) + { + dataChannel.onmessage -= OnMessage; + } + + peerConnection.Dispose(); + isDisposed = true; + } + } + + #endregion IDisposable + + #endregion + + /// + /// Send a client event to the server. + /// + /// to send to the server. + /// The event to send. + public async void Send(T @event) where T : IClientEvent + => await SendAsync(@event).ConfigureAwait(false); + + /// + /// Send a client event to the server. + /// + /// to send to the server. + /// The event to send. + /// Optional, . + /// Optional, . + /// . + public async Task SendAsync(T @event, CancellationToken cancellationToken = default) where T : IClientEvent + => await SendAsync(@event, null, cancellationToken).ConfigureAwait(false); + + /// + /// Send a client event to the server. + /// + /// to send to the server. + /// The event to send. + /// Optional, . + /// Optional, . + /// . + public async Task SendAsync(T @event, Action sessionEvents, CancellationToken cancellationToken = default) where T : IClientEvent + { + if (peerConnection.connectionState != RTCPeerConnectionState.connected) + { + throw new Exception($"WebRTC connection is not open! {peerConnection.connectionState}"); + } + + IClientEvent clientEvent = @event; + var payload = clientEvent.ToJsonString(); + + if (EnableDebug) + { + if (@event is not InputAudioBufferAppendRequest) + { + Console.WriteLine(payload); + } + } + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(EventTimeout)); + using var eventCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token); + var tcs = new TaskCompletionSource(); + eventCts.Token.Register(() => tcs.TrySetCanceled()); + OnEventReceived += EventCallback; + + lock (eventLock) + { + events.Enqueue(clientEvent); + } + + var eventId = Guid.NewGuid().ToString("N"); + + if (EnableDebug) + { + if (@event is not InputAudioBufferAppendRequest) + { + Console.WriteLine($"[{eventId}] sending {clientEvent.Type}"); + } + } + + peerConnection.DataChannels.First().send(payload); + + if (EnableDebug) + { + if (@event is not InputAudioBufferAppendRequest) + { + Console.WriteLine($"[{eventId}] sent {clientEvent.Type}"); + } + } + + if (@event is InputAudioBufferAppendRequest) + { + // no response for this client event + return default; + } + + var response = await tcs.Task.WithCancellation(eventCts.Token).ConfigureAwait(false); + + if (EnableDebug) + { + Console.WriteLine($"[{eventId}] received {response.Type}"); + } + + return response; + + void EventCallback(IServerEvent serverEvent) + { + sessionEvents?.Invoke(serverEvent); + + try + { + if (serverEvent is RealtimeEventError serverError) + { + tcs.TrySetException(serverError); + OnEventReceived -= EventCallback; + return; + } + + switch (clientEvent) + { + case UpdateSessionRequest when serverEvent is SessionResponse sessionResponse: + Configuration = sessionResponse.SessionConfiguration; + Complete(); + return; + case InputAudioBufferCommitRequest when serverEvent is InputAudioBufferCommittedResponse: + case InputAudioBufferClearRequest when serverEvent is InputAudioBufferClearedResponse: + case ConversationItemCreateRequest when serverEvent is ConversationItemCreatedResponse: + case ConversationItemTruncateRequest when serverEvent is ConversationItemTruncatedResponse: + case ConversationItemDeleteRequest when serverEvent is ConversationItemDeletedResponse: + Complete(); + return; + case CreateResponseRequest when serverEvent is RealtimeResponse serverResponse: + { + if (serverResponse.Response.Status == RealtimeResponseStatus.InProgress) + { + return; + } + + if (serverResponse.Response.Status != RealtimeResponseStatus.Completed) + { + tcs.TrySetException(new Exception(serverResponse.Response.StatusDetails.Error?.ToString() ?? serverResponse.Response.StatusDetails.Reason)); + } + else + { + Complete(); + } + + break; + } + } + } + catch (Exception e) + { + Console.WriteLine(e); + } + + return; + + void Complete() + { + if (EnableDebug) + { + Console.WriteLine($"{clientEvent.Type} -> {serverEvent.Type}"); + } + + tcs.TrySetResult(serverEvent); + OnEventReceived -= EventCallback; + } + } + } + } +} diff --git a/OpenAI-DotNet/Realtime/SessionConfiguration.cs b/OpenAI-DotNet/Realtime/SessionConfiguration.cs index b0a23871..5e51b9ab 100644 --- a/OpenAI-DotNet/Realtime/SessionConfiguration.cs +++ b/OpenAI-DotNet/Realtime/SessionConfiguration.cs @@ -46,15 +46,16 @@ public SessionConfiguration( : transcriptionModel); VoiceActivityDetectionSettings = turnDetectionSettings ?? new ServerVAD(); - var toolList = tools?.ToList(); + var toolList = tools?.ToList(); + + if (string.IsNullOrWhiteSpace(toolChoice)) + { + ToolChoice = "auto"; + } if (toolList is { Count: > 0 }) { - if (string.IsNullOrWhiteSpace(toolChoice)) - { - ToolChoice = "auto"; - } - else + if (!string.IsNullOrWhiteSpace(toolChoice)) { if (!toolChoice.Equals("none") && !toolChoice.Equals("required") &&