diff --git a/genai/live/live_conversation_audio_with_audio.py b/genai/live/live_conversation_audio_with_audio.py new file mode 100644 index 0000000000..1072b0f061 --- /dev/null +++ b/genai/live/live_conversation_audio_with_audio.py @@ -0,0 +1,127 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START googlegenaisdk_live_conversation_audio_with_audio] + +import asyncio +import base64 + +from google import genai +from google.genai.types import (AudioTranscriptionConfig, Blob, + LiveConnectConfig, Modality) +import numpy as np + +from scipy.io import wavfile + +# The number of audio frames to send in each chunk. +CHUNK = 4200 +CHANNELS = 1 +MODEL = "gemini-2.0-flash-live-preview-04-09" + +# The audio sample rate expected by the model. +INPUT_RATE = 16000 +# The audio sample rate of the audio generated by the model. +OUTPUT_RATE = 24000 + +# The sample width for 16-bit audio, which is standard for this type of audio data. +SAMPLE_WIDTH = 2 + +client = genai.Client() + + +def read_wavefile(filepath: str) -> tuple[str, str]: + # Read the .wav file using scipy.io.wavfile.read + rate, data = wavfile.read(filepath) + # Convert the NumPy array of audio samples back to raw bytes + raw_audio_bytes = data.tobytes() + # Encode the raw bytes to a base64 string. + # The result needs to be decoded from bytes to a UTF-8 string + base64_encoded_data = base64.b64encode(raw_audio_bytes).decode("ascii") + mime_type = f"audio/pcm;rate={rate}" + return base64_encoded_data, mime_type + + +def write_wavefile(filepath: str, audio_frames: list[bytes], rate: int) -> None: + """Writes a list of audio byte frames to a WAV file using scipy.""" + # Combine the list of byte frames into a single byte string + raw_audio_bytes = b"".join(audio_frames) + + # Convert the raw bytes to a NumPy array. + # The sample width is 2 bytes (16-bit), so we use np.int16 + audio_data = np.frombuffer(raw_audio_bytes, dtype=np.int16) + + # Write the NumPy array to a .wav file + wavfile.write(filepath, rate, audio_data) + print(f"Model response saved to {filepath}") + + +async def main() -> bool: + print("Starting the code") + async with client.aio.live.connect( + model=MODEL, + config=LiveConnectConfig( + # Set Model responses to be in Audio + response_modalities=[Modality.AUDIO], + # To generate transcript for input audio + input_audio_transcription=AudioTranscriptionConfig(), + # To generate transcript for output audio + output_audio_transcription=AudioTranscriptionConfig(), + ), + ) as session: + + async def send() -> None: + # using local file as an example for live audio input + wav_file_path = "hello_gemini_are_you_there.wav" + base64_data, mime_type = read_wavefile(wav_file_path) + audio_bytes = base64.b64decode(base64_data) + await session.send_realtime_input(media=Blob(data=audio_bytes, mime_type=mime_type)) + + async def receive() -> None: + audio_frames = [] + + async for message in session.receive(): + if message.server_content.input_transcription: + print(message.server_content.model_dump(mode="json", exclude_none=True)) + if message.server_content.output_transcription: + print(message.server_content.model_dump(mode="json", exclude_none=True)) + if message.server_content.model_turn: + for part in message.server_content.model_turn.parts: + if part.inline_data.data: + audio_data = part.inline_data.data + audio_frames.append(audio_data) + + if audio_frames: + write_wavefile( + "example_model_response.wav", + audio_frames, + OUTPUT_RATE, + ) + + send_task = asyncio.create_task(send()) + receive_task = asyncio.create_task(receive()) + await asyncio.gather(send_task, receive_task) + # Example response: + # gemini-2.0-flash-live-preview-04-09 + # {'input_transcription': {'text': 'Hello.'}} + # {'output_transcription': {}} + # {'output_transcription': {'text': 'Hi'}} + # {'output_transcription': {'text': ' there. What can I do for you today?'}} + # {'output_transcription': {'finished': True}} + # Model response saved to example_model_response.wav + +# [END googlegenaisdk_live_conversation_audio_with_audio] + return True + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/genai/live/live_ground_ragengine_with_txt.py b/genai/live/live_ground_ragengine_with_txt.py new file mode 100644 index 0000000000..8fe0b273fa --- /dev/null +++ b/genai/live/live_ground_ragengine_with_txt.py @@ -0,0 +1,66 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio + +_memory_corpus = "projects/cloud-ai-devrel-softserve/locations/us-central1/ragCorpora/2305843009213693952" + + +async def generate_content(memory_corpus: str) -> list[str]: + # [START googlegenaisdk_live_ground_ragengine_with_txt] + from google import genai + from google.genai.types import (Content, LiveConnectConfig, Modality, Part, + Retrieval, Tool, VertexRagStore, + VertexRagStoreRagResource) + + client = genai.Client() + model_id = "gemini-2.0-flash-live-preview-04-09" + rag_store = VertexRagStore( + rag_resources=[ + VertexRagStoreRagResource( + rag_corpus=memory_corpus # Use memory corpus if you want to store context. + ) + ], + # Set `store_context` to true to allow Live API sink context into your memory corpus. + store_context=True, + ) + config = LiveConnectConfig( + response_modalities=[Modality.TEXT], + tools=[Tool(retrieval=Retrieval(vertex_rag_store=rag_store))], + ) + + async with client.aio.live.connect(model=model_id, config=config) as session: + text_input = "What year did Mariusz Pudzianowski win World's Strongest Man?" + print("> ", text_input, "\n") + + await session.send_client_content( + turns=Content(role="user", parts=[Part(text=text_input)]) + ) + + response = [] + + async for message in session.receive(): + if message.text: + response.append(message.text) + continue + + print("".join(response)) + # Example output: + # > What year did Mariusz Pudzianowski win World's Strongest Man? + # Mariusz Pudzianowski won World's Strongest Man in 2002, 2003, 2005, 2007, and 2008. + # [END googlegenaisdk_live_ground_ragengine_with_txt] + return response + + +if __name__ == "__main__": + asyncio.run(generate_content(_memory_corpus)) diff --git a/genai/live/live_websocket_audiogen_with_txt.py b/genai/live/live_websocket_audiogen_with_txt.py index b63e60aaac..0fb00b05b0 100644 --- a/genai/live/live_websocket_audiogen_with_txt.py +++ b/genai/live/live_websocket_audiogen_with_txt.py @@ -55,9 +55,7 @@ async def generate_content() -> str: # Websocket Configuration WEBSOCKET_HOST = "us-central1-aiplatform.googleapis.com" - WEBSOCKET_SERVICE_URL = ( - f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" - ) + WEBSOCKET_SERVICE_URL = f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" # Websocket Authentication headers = { @@ -66,9 +64,7 @@ async def generate_content() -> str: } # Model Configuration - model_path = ( - f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" - ) + model_path = f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" model_generation_config = { "response_modalities": ["AUDIO"], "speech_config": { @@ -129,7 +125,9 @@ async def generate_content() -> str: for part in model_turn["parts"]: if part["inlineData"]["mimeType"] == "audio/pcm": audio_chunk = base64.b64decode(part["inlineData"]["data"]) - aggregated_response_parts.append(np.frombuffer(audio_chunk, dtype=np.int16)) + aggregated_response_parts.append( + np.frombuffer(audio_chunk, dtype=np.int16) + ) # End of response if server_content.get("turnComplete"): @@ -137,7 +135,9 @@ async def generate_content() -> str: # Save audio to a file if aggregated_response_parts: - wavfile.write("output.wav", 24000, np.concatenate(aggregated_response_parts)) + wavfile.write( + "output.wav", 24000, np.concatenate(aggregated_response_parts) + ) # Example response: # Setup Response: {'setupComplete': {}} # Input: Hello? Gemini are you there? diff --git a/genai/live/live_websocket_audiotranscript_with_txt.py b/genai/live/live_websocket_audiotranscript_with_txt.py index 6b769639eb..aa194ebb25 100644 --- a/genai/live/live_websocket_audiotranscript_with_txt.py +++ b/genai/live/live_websocket_audiotranscript_with_txt.py @@ -55,9 +55,7 @@ async def generate_content() -> str: # Websocket Configuration WEBSOCKET_HOST = "us-central1-aiplatform.googleapis.com" - WEBSOCKET_SERVICE_URL = ( - f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" - ) + WEBSOCKET_SERVICE_URL = f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" # Websocket Authentication headers = { @@ -66,9 +64,7 @@ async def generate_content() -> str: } # Model Configuration - model_path = ( - f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" - ) + model_path = f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" model_generation_config = { "response_modalities": ["AUDIO"], "speech_config": { @@ -142,7 +138,9 @@ async def generate_content() -> str: for part in model_turn["parts"]: if part["inlineData"]["mimeType"] == "audio/pcm": audio_chunk = base64.b64decode(part["inlineData"]["data"]) - aggregated_response_parts.append(np.frombuffer(audio_chunk, dtype=np.int16)) + aggregated_response_parts.append( + np.frombuffer(audio_chunk, dtype=np.int16) + ) # End of response if server_content.get("turnComplete"): diff --git a/genai/live/live_websocket_textgen_with_audio.py b/genai/live/live_websocket_textgen_with_audio.py index 00923d3931..3241e3ce97 100644 --- a/genai/live/live_websocket_textgen_with_audio.py +++ b/genai/live/live_websocket_textgen_with_audio.py @@ -65,9 +65,7 @@ def read_wavefile(filepath: str) -> tuple[str, str]: # Websocket Configuration WEBSOCKET_HOST = "us-central1-aiplatform.googleapis.com" - WEBSOCKET_SERVICE_URL = ( - f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" - ) + WEBSOCKET_SERVICE_URL = f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" # Websocket Authentication headers = { @@ -76,9 +74,7 @@ def read_wavefile(filepath: str) -> tuple[str, str]: } # Model Configuration - model_path = ( - f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" - ) + model_path = f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" model_generation_config = {"response_modalities": ["TEXT"]} async with connect(WEBSOCKET_SERVICE_URL, additional_headers=headers) as websocket_session: @@ -105,7 +101,9 @@ def read_wavefile(filepath: str) -> tuple[str, str]: return "Error: WebSocket setup failed." # 3. Send audio message - encoded_audio_message, mime_type = read_wavefile("hello_gemini_are_you_there.wav") + encoded_audio_message, mime_type = read_wavefile( + "hello_gemini_are_you_there.wav" + ) # Example audio message: "Hello? Gemini are you there?" user_message = { diff --git a/genai/live/live_websocket_textgen_with_txt.py b/genai/live/live_websocket_textgen_with_txt.py index 56b6947205..d0c0c3fcce 100644 --- a/genai/live/live_websocket_textgen_with_txt.py +++ b/genai/live/live_websocket_textgen_with_txt.py @@ -52,9 +52,7 @@ async def generate_content() -> str: # Websocket Configuration WEBSOCKET_HOST = "us-central1-aiplatform.googleapis.com" - WEBSOCKET_SERVICE_URL = ( - f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" - ) + WEBSOCKET_SERVICE_URL = f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" # Websocket Authentication headers = { @@ -63,9 +61,7 @@ async def generate_content() -> str: } # Model Configuration - model_path = ( - f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" - ) + model_path = f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" model_generation_config = {"response_modalities": ["TEXT"]} async with connect(WEBSOCKET_SERVICE_URL, additional_headers=headers) as websocket_session: diff --git a/genai/live/requirements-test.txt b/genai/live/requirements-test.txt index 1b59fd9d24..1cb0b5ef6a 100644 --- a/genai/live/requirements-test.txt +++ b/genai/live/requirements-test.txt @@ -2,3 +2,4 @@ backoff==2.2.1 google-api-core==2.25.1 pytest==8.4.1 pytest-asyncio==1.1.0 +pytest-mock==3.14.0 diff --git a/genai/live/requirements.txt b/genai/live/requirements.txt index dd1891ee07..d0e355dfbe 100644 --- a/genai/live/requirements.txt +++ b/genai/live/requirements.txt @@ -1,7 +1,8 @@ -google-genai==1.28.0 -scipy==1.16.1 -websockets==15.0.1 -numpy==1.26.4 -soundfile==0.12.1 -openai==1.99.1 -setuptools==80.9.0 \ No newline at end of file +google-genai==1.28.0 +scipy==1.16.1 +websockets==15.0.1 +numpy==1.26.4 +soundfile==0.12.1 +openai==1.99.1 +setuptools==80.9.0 +pyaudio==0.2.14 \ No newline at end of file diff --git a/genai/live/test_live_examples.py b/genai/live/test_live_examples.py index f4d25e137e..5cd35b7fc7 100644 --- a/genai/live/test_live_examples.py +++ b/genai/live/test_live_examples.py @@ -19,11 +19,14 @@ import os import pytest +import pytest_mock import live_audiogen_with_txt import live_code_exec_with_txt +import live_conversation_audio_with_audio import live_func_call_with_txt import live_ground_googsearch_with_txt +import live_ground_ragengine_with_txt import live_structured_ouput_with_txt import live_transcribe_with_audio import live_txtgen_with_audio @@ -39,6 +42,77 @@ # os.environ['GOOGLE_CLOUD_PROJECT'] = "add-your-project-name" +@pytest.fixture() +def mock_rag_components(mocker: pytest_mock.MockerFixture) -> None: + mock_client_cls = mocker.patch("google.genai.Client") + + class AsyncIterator: + def __init__(self) -> None: + self.used = False + + def __aiter__(self) -> "AsyncIterator": + return self + + async def __anext__(self) -> object: + if not self.used: + self.used = True + return mocker.MagicMock( + text="Mariusz Pudzianowski won in 2002, 2003, 2005, 2007, and 2008." + ) + raise StopAsyncIteration + + mock_session = mocker.AsyncMock() + mock_session.__aenter__.return_value = mock_session + mock_session.receive = lambda: AsyncIterator() + + mock_client_cls.return_value.aio.live.connect.return_value = mock_session + + +@pytest.fixture() +def mock_audio_components(mocker: pytest_mock.MockerFixture) -> None: + mock_client_cls = mocker.patch("google.genai.Client") + + class AsyncIterator: + def __init__(self) -> None: + self.used = 0 + + def __aiter__(self) -> "AsyncIterator": + return self + + async def __anext__(self) -> object: + if self.used == 0: + self.used += 1 + msg = mocker.MagicMock() + msg.server_content.input_transcription = {"text": "Hello."} + msg.server_content.output_transcription = None + msg.server_content.model_turn = None + return msg + elif self.used == 1: + self.used += 1 + msg = mocker.MagicMock() + msg.server_content.input_transcription = None + msg.server_content.output_transcription = {"text": "Hi there!"} + msg.server_content.model_turn = None + return msg + elif self.used == 2: + self.used += 1 + msg = mocker.MagicMock() + msg.server_content.input_transcription = None + msg.server_content.output_transcription = None + part = mocker.MagicMock() + part.inline_data.data = b"\x00\x01" # fake audio data + msg.server_content.model_turn.parts = [part] + return msg + raise StopAsyncIteration + + mock_session = mocker.AsyncMock() + mock_session.__aenter__.return_value = mock_session + mock_session.receive = lambda: AsyncIterator() + mock_session.send_realtime_input = mocker.AsyncMock() + + mock_client_cls.return_value.aio.live.connect.return_value = mock_session + + @pytest.mark.asyncio async def test_live_with_text() -> None: assert await live_with_txt.generate_content() @@ -98,3 +172,13 @@ async def test_live_txtgen_with_audio() -> None: @pytest.mark.asyncio async def test_live_structured_ouput_with_txt() -> None: assert live_structured_ouput_with_txt.generate_content() + + +@pytest.mark.asyncio +async def test_live_ground_ragengine_with_txt(mock_rag_components: None) -> None: + assert await live_ground_ragengine_with_txt.generate_content("test") + + +@pytest.mark.asyncio +async def test_live_conversation_audio_with_audio(mock_audio_components: None) -> None: + assert await live_conversation_audio_with_audio.main() diff --git a/genai/text_generation/test_text_generation_examples.py b/genai/text_generation/test_text_generation_examples.py index 3381ae7ec8..5277f02f4d 100644 --- a/genai/text_generation/test_text_generation_examples.py +++ b/genai/text_generation/test_text_generation_examples.py @@ -136,7 +136,6 @@ def test_textgen_with_youtube_video() -> None: response = textgen_with_youtube_video.generate_content() assert response - # Migrated to Model Optimser Folder # def test_model_optimizer_textgen_with_txt() -> None: # os.environ["GOOGLE_CLOUD_LOCATION"] = "us-central1"