From c928344cb9c3fbbd466202f616a9d08ac84cd685 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 24 Jul 2025 00:36:22 +0000 Subject: [PATCH 01/18] Add Managed Kafka Connect code samples for clusters * Adds code examples for creating, deleting, getting, listing and updating Managed Kafka Connect clusters --- .../connect/clusters/clusters_test.py | 176 ++++++++++++++++++ .../clusters/create_connect_cluster.py | 84 +++++++++ .../clusters/delete_connect_cluster.py | 59 ++++++ .../connect/clusters/get_connect_cluster.py | 55 ++++++ .../connect/clusters/list_connect_clusters.py | 47 +++++ .../clusters/update_connect_cluster.py | 71 +++++++ 6 files changed, 492 insertions(+) create mode 100644 managedkafka/snippets/connect/clusters/clusters_test.py create mode 100644 managedkafka/snippets/connect/clusters/create_connect_cluster.py create mode 100644 managedkafka/snippets/connect/clusters/delete_connect_cluster.py create mode 100644 managedkafka/snippets/connect/clusters/get_connect_cluster.py create mode 100644 managedkafka/snippets/connect/clusters/list_connect_clusters.py create mode 100644 managedkafka/snippets/connect/clusters/update_connect_cluster.py diff --git a/managedkafka/snippets/connect/clusters/clusters_test.py b/managedkafka/snippets/connect/clusters/clusters_test.py new file mode 100644 index 00000000000..6d8c2b1c69e --- /dev/null +++ b/managedkafka/snippets/connect/clusters/clusters_test.py @@ -0,0 +1,176 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock +from unittest.mock import MagicMock + +from google.api_core.operation import Operation +from google.cloud import managedkafka_v1 +import pytest + +import create_connect_cluster +import delete_connect_cluster +import get_connect_cluster +import list_connect_clusters +import update_connect_cluster + +PROJECT_ID = "test-project-id" +REGION = "us-central1" +KAFKA_CLUSTER_ID = "test-cluster-id" +CONNECT_CLUSTER_ID = "test-connect-cluster-id" + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connect_cluster" +) +def test_create_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + cpu = 3 + memory_bytes = 3221225472 + primary_subnet = "test-subnet" + operation = mock.MagicMock(spec=Operation) + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + operation.result = mock.MagicMock(return_value=connect_cluster) + mock_method.return_value = operation + + create_connect_cluster.create_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + kafka_cluster_id=KAFKA_CLUSTER_ID, + primary_subnet=primary_subnet, + cpu=cpu, + memory_bytes=memory_bytes, + ) + + out, _ = capsys.readouterr() + assert "Created Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.get_connect_cluster" +) +def test_get_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + mock_method.return_value = connect_cluster + + get_connect_cluster.get_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Got Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.update_connect_cluster" +) +def test_update_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + new_memory_bytes = 3221225475 + operation = mock.MagicMock(spec=Operation) + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + connect_cluster.capacity_config.memory_bytes = new_memory_bytes + operation.result = mock.MagicMock(return_value=connect_cluster) + mock_method.return_value = operation + + update_connect_cluster.update_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + memory_bytes=new_memory_bytes, + ) + + out, _ = capsys.readouterr() + assert "Updated Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + assert str(new_memory_bytes) in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.list_connect_clusters" +) +def test_list_connect_clusters( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + + response = [connect_cluster] + mock_method.return_value = response + + list_connect_clusters.list_connect_clusters( + project_id=PROJECT_ID, + region=REGION, + ) + + out, _ = capsys.readouterr() + assert "Got Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.delete_connect_cluster" +) +def test_delete_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + operation = mock.MagicMock(spec=Operation) + mock_method.return_value = operation + + delete_connect_cluster.delete_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Deleted Connect cluster" in out + mock_method.assert_called_once() diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py new file mode 100644 index 00000000000..174e9fc94b5 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -0,0 +1,84 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def create_connect_cluster( + project_id: str, + region: str, + connect_cluster_id: str, + kafka_cluster_id: str, + primary_subnet: str, + cpu: int, + memory_bytes: int, +) -> None: + """ + Create a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + kafka_cluster_id: The ID of the primary Managed Service for Apache Kafka cluster. + primary_subnet: The primary VPC subnet for the Connect cluster workers. The expected format is projects/{project_id}/regions/{region}/subnetworks/{subnet_id}. + cpu: Number of vCPUs to provision for the cluster. The minimum is 3. + memory_bytes: The memory to provision for the cluster in bytes. Must be between 1 GiB * cpu and 8 GiB * cpu. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # [START managedkafka_create_connect_cluster] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient + from google.cloud.managedkafka_v1.types import ConnectCluster, CreateConnectClusterRequest, ConnectNetworkConfig + + # TODO(developer): Update with your values. + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # kafka_cluster_id = "my-kafka-cluster" + # subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" + # cpu = 3 + # memory_bytes = 3221225472 # 3 GiB + + connect_client = ManagedKafkaConnectClient() + kafka_client = managedkafka_v1.ManagedKafkaClient() + + parent = connect_client.common_location_path(project_id, region) + kafka_cluster_path = kafka_client.cluster_path(project_id, region, kafka_cluster_id) + + connect_cluster = ConnectCluster() + connect_cluster.name = connect_client.connect_cluster_path(project_id, region, connect_cluster_id) + connect_cluster.kafka_cluster = kafka_cluster_path + connect_cluster.capacity_config.vcpu_count = cpu + connect_cluster.capacity_config.memory_bytes = memory_bytes + connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)] + + request = CreateConnectClusterRequest( + parent=parent, + connect_cluster_id=connect_cluster_id, + connect_cluster=connect_cluster, + ) + + try: + operation = connect_client.create_connect_cluster(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + # Creating a Connect cluster can take 20-30 minutes. + response = operation.result(timeout=1800) + print("Created Connect cluster:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e.message}") + + # [END managedkafka_create_connect_cluster] \ No newline at end of file diff --git a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py new file mode 100644 index 00000000000..642a6cdac20 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py @@ -0,0 +1,59 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def delete_connect_cluster( + project_id: str, + region: str, + connect_cluster_id: str, +) -> None: + """ + Delete a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # [START managedkafka_delete_connect_cluster] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + + client = ManagedKafkaConnectClient() + + request = managedkafka_v1.DeleteConnectClusterRequest( + name=client.connect_cluster_path(project_id, region, connect_cluster_id), + ) + + try: + operation = client.delete_connect_cluster(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + operation.result() + print("Deleted Connect cluster") + except GoogleAPICallError as e: + print(f"The operation failed with error: {e.message}") + + # [END managedkafka_delete_connect_cluster] diff --git a/managedkafka/snippets/connect/clusters/get_connect_cluster.py b/managedkafka/snippets/connect/clusters/get_connect_cluster.py new file mode 100644 index 00000000000..cd8adefbf72 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/get_connect_cluster.py @@ -0,0 +1,55 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def get_connect_cluster( + project_id: str, + region: str, + connect_cluster_id: str, +) -> None: + """ + Get a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + + Raises: + This method will raise the NotFound exception if the Connect cluster is not found. + """ + # [START managedkafka_get_connect_cluster] + from google.api_core.exceptions import NotFound + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + + client = ManagedKafkaConnectClient() + + cluster_path = client.connect_cluster_path(project_id, region, connect_cluster_id) + request = managedkafka_v1.GetConnectClusterRequest( + name=cluster_path, + ) + + try: + cluster = client.get_connect_cluster(request=request) + print("Got Connect cluster:", cluster) + except NotFound as e: + print(f"Failed to get Connect cluster {connect_cluster_id} with error: {e.message}") + + # [END managedkafka_get_connect_cluster] diff --git a/managedkafka/snippets/connect/clusters/list_connect_clusters.py b/managedkafka/snippets/connect/clusters/list_connect_clusters.py new file mode 100644 index 00000000000..049c23b3cdc --- /dev/null +++ b/managedkafka/snippets/connect/clusters/list_connect_clusters.py @@ -0,0 +1,47 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def list_connect_clusters( + project_id: str, + region: str, +) -> None: + """ + List Kafka Connect clusters in a given project ID and region. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + """ + # [START managedkafka_list_connect_clusters] + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + + connect_client = ManagedKafkaConnectClient() + + request = managedkafka_v1.ListConnectClustersRequest( + parent=connect_client.common_location_path(project_id, region), + ) + + response = connect_client.list_connect_clusters(request=request) + for cluster in response: + print("Got Connect cluster:", cluster) + + # [END managedkafka_list_connect_clusters] diff --git a/managedkafka/snippets/connect/clusters/update_connect_cluster.py b/managedkafka/snippets/connect/clusters/update_connect_cluster.py new file mode 100644 index 00000000000..722136956e3 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/update_connect_cluster.py @@ -0,0 +1,71 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def update_connect_cluster( + project_id: str, region: str, connect_cluster_id: str, memory_bytes: int +) -> None: + """ + Update a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + memory_bytes: The memory to provision for the cluster in bytes. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # [START managedkafka_update_connect_cluster] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import ConnectCluster + from google.protobuf import field_mask_pb2 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # memory_bytes = 4295000000 + + connect_client = ManagedKafkaConnectClient() + + connect_cluster = ConnectCluster() + connect_cluster.name = connect_client.connect_cluster_path( + project_id, region, connect_cluster_id + ) + connect_cluster.capacity_config.memory_bytes = memory_bytes + update_mask = field_mask_pb2.FieldMask() + update_mask.paths.append("capacity_config.memory_bytes") + + # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/create-connect-cluster#properties. + request = managedkafka_v1.UpdateConnectClusterRequest( + update_mask=update_mask, + connect_cluster=connect_cluster, + ) + + try: + operation = connect_client.update_connect_cluster(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Updated Connect cluster:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e.message}") + + # [END managedkafka_update_connect_cluster] From 8f2224a4c7104a3eab01faf7ada6c04851799bf4 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 24 Jul 2025 00:42:16 +0000 Subject: [PATCH 02/18] Update google-cloud-managedkafka version to 0.1.12 --- managedkafka/snippets/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/requirements.txt b/managedkafka/snippets/requirements.txt index a7da4ff6516..5f372e81c41 100644 --- a/managedkafka/snippets/requirements.txt +++ b/managedkafka/snippets/requirements.txt @@ -2,5 +2,5 @@ protobuf==5.29.4 pytest==8.2.2 google-api-core==2.23.0 google-auth==2.38.0 -google-cloud-managedkafka==0.1.5 +google-cloud-managedkafka==0.1.12 googleapis-common-protos==1.66.0 From 1feb3544a25608ba003dede93ccf1ffa4db967a1 Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:50:00 -0400 Subject: [PATCH 03/18] Update managedkafka/snippets/connect/clusters/create_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../snippets/connect/clusters/create_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 174e9fc94b5..1d3cf0a9284 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -49,7 +49,7 @@ def create_connect_cluster( # region = "us-central1" # connect_cluster_id = "my-connect-cluster" # kafka_cluster_id = "my-kafka-cluster" - # subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" + # primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" # cpu = 3 # memory_bytes = 3221225472 # 3 GiB From 5840336f20298fce760ca3803048b2b423a9551a Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:50:24 -0400 Subject: [PATCH 04/18] Update managedkafka/snippets/connect/clusters/create_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../snippets/connect/clusters/create_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 1d3cf0a9284..004ed8610bd 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -79,6 +79,6 @@ def create_connect_cluster( response = operation.result(timeout=1800) print("Created Connect cluster:", response) except GoogleAPICallError as e: - print(f"The operation failed with error: {e.message}") + print(f"The operation failed with error: {e}") # [END managedkafka_create_connect_cluster] \ No newline at end of file From d0df10fa42b6e6048b173cf88a3de524515c4d47 Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:50:35 -0400 Subject: [PATCH 05/18] Update managedkafka/snippets/connect/clusters/delete_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../snippets/connect/clusters/delete_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py index 642a6cdac20..f05bc1fbdcf 100644 --- a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py @@ -54,6 +54,6 @@ def delete_connect_cluster( operation.result() print("Deleted Connect cluster") except GoogleAPICallError as e: - print(f"The operation failed with error: {e.message}") + print(f"The operation failed with error: {e}") # [END managedkafka_delete_connect_cluster] From 505759a4cf37c1359b6217bbc87cad629fff5923 Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:50:42 -0400 Subject: [PATCH 06/18] Update managedkafka/snippets/connect/clusters/get_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- managedkafka/snippets/connect/clusters/get_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/get_connect_cluster.py b/managedkafka/snippets/connect/clusters/get_connect_cluster.py index cd8adefbf72..8dfd39b5958 100644 --- a/managedkafka/snippets/connect/clusters/get_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/get_connect_cluster.py @@ -50,6 +50,6 @@ def get_connect_cluster( cluster = client.get_connect_cluster(request=request) print("Got Connect cluster:", cluster) except NotFound as e: - print(f"Failed to get Connect cluster {connect_cluster_id} with error: {e.message}") + print(f"Failed to get Connect cluster {connect_cluster_id} with error: {e}") # [END managedkafka_get_connect_cluster] From d3fba2c9784ad8f49e9b8941bbf8101515de8e65 Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:51:04 -0400 Subject: [PATCH 07/18] Update managedkafka/snippets/connect/clusters/list_connect_clusters.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../snippets/connect/clusters/list_connect_clusters.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/list_connect_clusters.py b/managedkafka/snippets/connect/clusters/list_connect_clusters.py index 049c23b3cdc..615626017db 100644 --- a/managedkafka/snippets/connect/clusters/list_connect_clusters.py +++ b/managedkafka/snippets/connect/clusters/list_connect_clusters.py @@ -42,6 +42,11 @@ def list_connect_clusters( response = connect_client.list_connect_clusters(request=request) for cluster in response: - print("Got Connect cluster:", cluster) + try: + response = connect_client.list_connect_clusters(request=request) + for cluster in response: + print("Got Connect cluster:", cluster) + except GoogleAPICallError as e: + print(f"Failed to list Connect clusters with error: {e}") # [END managedkafka_list_connect_clusters] From 0a85a8bf9d725d7c7a0b310b5f6032110e558f4b Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:51:16 -0400 Subject: [PATCH 08/18] Update managedkafka/snippets/connect/clusters/update_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../snippets/connect/clusters/update_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/update_connect_cluster.py b/managedkafka/snippets/connect/clusters/update_connect_cluster.py index 722136956e3..36ec1c5ec06 100644 --- a/managedkafka/snippets/connect/clusters/update_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/update_connect_cluster.py @@ -66,6 +66,6 @@ def update_connect_cluster( response = operation.result() print("Updated Connect cluster:", response) except GoogleAPICallError as e: - print(f"The operation failed with error: {e.message}") + print(f"The operation failed with error: {e}") # [END managedkafka_update_connect_cluster] From dfa07d2ac8ccb59e61878e154a4ba0d8f7f859b8 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 24 Jul 2025 01:00:43 +0000 Subject: [PATCH 09/18] Add timeouts and improve error handling. --- .../snippets/connect/clusters/create_connect_cluster.py | 7 ++++--- .../snippets/connect/clusters/delete_connect_cluster.py | 3 ++- .../snippets/connect/clusters/list_connect_clusters.py | 9 ++++----- .../snippets/connect/clusters/update_connect_cluster.py | 2 ++ 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 004ed8610bd..6dfc218e684 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -75,10 +75,11 @@ def create_connect_cluster( try: operation = connect_client.create_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") - # Creating a Connect cluster can take 20-30 minutes. - response = operation.result(timeout=1800) + # Creating a Connect cluster can take 10-40 minutes. + response = operation.result(timeout=3000) print("Created Connect cluster:", response) except GoogleAPICallError as e: print(f"The operation failed with error: {e}") - # [END managedkafka_create_connect_cluster] \ No newline at end of file + # [END managedkafka_create_connect_cluster] + \ No newline at end of file diff --git a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py index f05bc1fbdcf..b044fd2b047 100644 --- a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py @@ -51,7 +51,8 @@ def delete_connect_cluster( try: operation = client.delete_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") - operation.result() + # Deleting a Connect cluster can take 10-40 minutes. + operation.result(timeout=3000) print("Deleted Connect cluster") except GoogleAPICallError as e: print(f"The operation failed with error: {e}") diff --git a/managedkafka/snippets/connect/clusters/list_connect_clusters.py b/managedkafka/snippets/connect/clusters/list_connect_clusters.py index 615626017db..749a5267d91 100644 --- a/managedkafka/snippets/connect/clusters/list_connect_clusters.py +++ b/managedkafka/snippets/connect/clusters/list_connect_clusters.py @@ -29,6 +29,7 @@ def list_connect_clusters( from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( ManagedKafkaConnectClient, ) + from google.api_core.exceptions import GoogleAPICallError # TODO(developer) # project_id = "my-project-id" @@ -42,11 +43,9 @@ def list_connect_clusters( response = connect_client.list_connect_clusters(request=request) for cluster in response: - try: - response = connect_client.list_connect_clusters(request=request) - for cluster in response: + try: print("Got Connect cluster:", cluster) - except GoogleAPICallError as e: - print(f"Failed to list Connect clusters with error: {e}") + except GoogleAPICallError as e: + print(f"Failed to list Connect clusters with error: {e}") # [END managedkafka_list_connect_clusters] diff --git a/managedkafka/snippets/connect/clusters/update_connect_cluster.py b/managedkafka/snippets/connect/clusters/update_connect_cluster.py index 36ec1c5ec06..f7ed0d0248e 100644 --- a/managedkafka/snippets/connect/clusters/update_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/update_connect_cluster.py @@ -63,6 +63,8 @@ def update_connect_cluster( try: operation = connect_client.update_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") + # Updating a Connect cluster can take 10-40 minutes. + operation.result(timeout=3000) response = operation.result() print("Updated Connect cluster:", response) except GoogleAPICallError as e: From ab697bfab4b1f26ff4ed5e0d9226fafe9800d889 Mon Sep 17 00:00:00 2001 From: salmany Date: Mon, 28 Jul 2025 19:44:31 +0000 Subject: [PATCH 10/18] Addressed PR comments. --- .../snippets/connect/clusters/clusters_test.py | 6 +++--- .../connect/clusters/create_connect_cluster.py | 10 +++++++++- .../connect/clusters/delete_connect_cluster.py | 6 ++---- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/managedkafka/snippets/connect/clusters/clusters_test.py b/managedkafka/snippets/connect/clusters/clusters_test.py index 6d8c2b1c69e..26fea584393 100644 --- a/managedkafka/snippets/connect/clusters/clusters_test.py +++ b/managedkafka/snippets/connect/clusters/clusters_test.py @@ -38,8 +38,8 @@ def test_create_connect_cluster( mock_method: MagicMock, capsys: pytest.CaptureFixture[str], ) -> None: - cpu = 3 - memory_bytes = 3221225472 + cpu = 12 + memory_bytes = 12884901900 # 12 GB primary_subnet = "test-subnet" operation = mock.MagicMock(spec=Operation) connect_cluster = managedkafka_v1.types.ConnectCluster() @@ -101,7 +101,7 @@ def test_update_connect_cluster( mock_method: MagicMock, capsys: pytest.CaptureFixture[str], ) -> None: - new_memory_bytes = 3221225475 + new_memory_bytes = 12884901900 # 12 GB operation = mock.MagicMock(spec=Operation) connect_cluster = managedkafka_v1.types.ConnectCluster() connect_cluster.name = ( diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 6dfc218e684..3187841e86a 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -65,6 +65,15 @@ def create_connect_cluster( connect_cluster.capacity_config.vcpu_count = cpu connect_cluster.capacity_config.memory_bytes = memory_bytes connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)] + # Optionally, you can also specify accessible subnets and resolvable DNS domains as part of your network configuration. + # For example: + # connect_cluster.gcp_config.access_config.network_configs = [ + # ConnectNetworkConfig( + # primary_subnet=primary_subnet, + # additional_subnets=additional_subnets, + # dns_domain_names=dns_domain_names, + # ) + # ] request = CreateConnectClusterRequest( parent=parent, @@ -82,4 +91,3 @@ def create_connect_cluster( print(f"The operation failed with error: {e}") # [END managedkafka_create_connect_cluster] - \ No newline at end of file diff --git a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py index b044fd2b047..84258fe830f 100644 --- a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py @@ -27,8 +27,7 @@ def delete_connect_cluster( connect_cluster_id: ID of the Kafka Connect cluster. Raises: - This method will raise the GoogleAPICallError exception if the operation errors or - the timeout before the operation completes is reached. + This method will raise the GoogleAPICallError exception if the operation errors. """ # [START managedkafka_delete_connect_cluster] from google.api_core.exceptions import GoogleAPICallError @@ -51,8 +50,7 @@ def delete_connect_cluster( try: operation = client.delete_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") - # Deleting a Connect cluster can take 10-40 minutes. - operation.result(timeout=3000) + operation.result() print("Deleted Connect cluster") except GoogleAPICallError as e: print(f"The operation failed with error: {e}") From 333932509b10373dea0ae9d5746f1f662e0737ee Mon Sep 17 00:00:00 2001 From: salmany Date: Tue, 29 Jul 2025 01:00:38 +0000 Subject: [PATCH 11/18] Adds requirements.txt file for samples. As per the [Authoring Guide] (https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#dependencies), each sample is required to have a requirements.txt file that lists the dependencies needed to run the sample. --- managedkafka/snippets/connect/clusters/requirements.txt | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 managedkafka/snippets/connect/clusters/requirements.txt diff --git a/managedkafka/snippets/connect/clusters/requirements.txt b/managedkafka/snippets/connect/clusters/requirements.txt new file mode 100644 index 00000000000..5f372e81c41 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/requirements.txt @@ -0,0 +1,6 @@ +protobuf==5.29.4 +pytest==8.2.2 +google-api-core==2.23.0 +google-auth==2.38.0 +google-cloud-managedkafka==0.1.12 +googleapis-common-protos==1.66.0 From 8391c7712112a0645d2b5522f55b90e0a686995f Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 30 Jul 2025 17:03:12 -0400 Subject: [PATCH 12/18] Remove timeout in update_connect_cluster.py Remove timeout to align with Managed Kafka Cluster update example: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/cdae4cacfe8f9612e554af11ef72bc8d34765ada/managedkafka/snippets/clusters/update_cluster.py#L60 --- .../snippets/connect/clusters/update_connect_cluster.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/managedkafka/snippets/connect/clusters/update_connect_cluster.py b/managedkafka/snippets/connect/clusters/update_connect_cluster.py index f7ed0d0248e..16587046949 100644 --- a/managedkafka/snippets/connect/clusters/update_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/update_connect_cluster.py @@ -63,8 +63,7 @@ def update_connect_cluster( try: operation = connect_client.update_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") - # Updating a Connect cluster can take 10-40 minutes. - operation.result(timeout=3000) + operation.result() response = operation.result() print("Updated Connect cluster:", response) except GoogleAPICallError as e: From 791fa63a9866f779661e02a6d0a7f342d327778d Mon Sep 17 00:00:00 2001 From: salmany Date: Mon, 4 Aug 2025 18:27:09 +0000 Subject: [PATCH 13/18] Fix CPU and memory values for Cluster creation. --- .../snippets/connect/clusters/create_connect_cluster.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 3187841e86a..f0175f19eff 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -50,8 +50,8 @@ def create_connect_cluster( # connect_cluster_id = "my-connect-cluster" # kafka_cluster_id = "my-kafka-cluster" # primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" - # cpu = 3 - # memory_bytes = 3221225472 # 3 GiB + # cpu = 12 + # memory_bytes = 12884901888 # 12 GiB connect_client = ManagedKafkaConnectClient() kafka_client = managedkafka_v1.ManagedKafkaClient() From 9c5bc389692d070c84f0cf58f1ae26d46d87a9ff Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 7 Aug 2025 21:13:25 +0000 Subject: [PATCH 14/18] Fixed comment for minimum vCpu. --- .../snippets/connect/clusters/create_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index f0175f19eff..26a4e926ac4 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -31,7 +31,7 @@ def create_connect_cluster( connect_cluster_id: ID of the Kafka Connect cluster. kafka_cluster_id: The ID of the primary Managed Service for Apache Kafka cluster. primary_subnet: The primary VPC subnet for the Connect cluster workers. The expected format is projects/{project_id}/regions/{region}/subnetworks/{subnet_id}. - cpu: Number of vCPUs to provision for the cluster. The minimum is 3. + cpu: Number of vCPUs to provision for the cluster. The minimum is 12. memory_bytes: The memory to provision for the cluster in bytes. Must be between 1 GiB * cpu and 8 GiB * cpu. Raises: From 14adc9a413362dff6ca811b702485e819617029a Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 7 Aug 2025 22:28:31 +0000 Subject: [PATCH 15/18] Fix lint by adding space. --- .../snippets/connect/clusters/create_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 26a4e926ac4..c3045ed84d1 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -51,7 +51,7 @@ def create_connect_cluster( # kafka_cluster_id = "my-kafka-cluster" # primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" # cpu = 12 - # memory_bytes = 12884901888 # 12 GiB + # memory_bytes = 12884901888 # 12 GiB connect_client = ManagedKafkaConnectClient() kafka_client = managedkafka_v1.ManagedKafkaClient() From 831b4f1182cc0f03ce33552b9d8724be62b8ddf8 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 7 Aug 2025 23:49:00 +0000 Subject: [PATCH 16/18] Fix lint with space. --- managedkafka/snippets/connect/clusters/clusters_test.py | 2 +- managedkafka/snippets/connect/connectors/connectors_test.py | 0 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 managedkafka/snippets/connect/connectors/connectors_test.py diff --git a/managedkafka/snippets/connect/clusters/clusters_test.py b/managedkafka/snippets/connect/clusters/clusters_test.py index 26fea584393..a1634cec768 100644 --- a/managedkafka/snippets/connect/clusters/clusters_test.py +++ b/managedkafka/snippets/connect/clusters/clusters_test.py @@ -39,7 +39,7 @@ def test_create_connect_cluster( capsys: pytest.CaptureFixture[str], ) -> None: cpu = 12 - memory_bytes = 12884901900 # 12 GB + memory_bytes = 12884901900 # 12 GB primary_subnet = "test-subnet" operation = mock.MagicMock(spec=Operation) connect_cluster = managedkafka_v1.types.ConnectCluster() diff --git a/managedkafka/snippets/connect/connectors/connectors_test.py b/managedkafka/snippets/connect/connectors/connectors_test.py new file mode 100644 index 00000000000..e69de29bb2d From df0bd79cab32391d2baa182a122747470d6f5edf Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 7 Aug 2025 23:51:29 +0000 Subject: [PATCH 17/18] Remove file added by mistake. --- managedkafka/snippets/connect/connectors/connectors_test.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 managedkafka/snippets/connect/connectors/connectors_test.py diff --git a/managedkafka/snippets/connect/connectors/connectors_test.py b/managedkafka/snippets/connect/connectors/connectors_test.py deleted file mode 100644 index e69de29bb2d..00000000000 From eb35dda95f884afd324612953eedc075ea6b55d6 Mon Sep 17 00:00:00 2001 From: Katie McLaughlin Date: Fri, 8 Aug 2025 11:50:30 +1000 Subject: [PATCH 18/18] lint: fix lint errors --- managedkafka/snippets/connect/clusters/clusters_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/clusters_test.py b/managedkafka/snippets/connect/clusters/clusters_test.py index a1634cec768..bb3b7295428 100644 --- a/managedkafka/snippets/connect/clusters/clusters_test.py +++ b/managedkafka/snippets/connect/clusters/clusters_test.py @@ -19,7 +19,7 @@ from google.cloud import managedkafka_v1 import pytest -import create_connect_cluster +import create_connect_cluster # noqa: I100 import delete_connect_cluster import get_connect_cluster import list_connect_clusters