From c928344cb9c3fbbd466202f616a9d08ac84cd685 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 24 Jul 2025 00:36:22 +0000 Subject: [PATCH 01/19] Add Managed Kafka Connect code samples for clusters * Adds code examples for creating, deleting, getting, listing and updating Managed Kafka Connect clusters --- .../connect/clusters/clusters_test.py | 176 ++++++++++++++++++ .../clusters/create_connect_cluster.py | 84 +++++++++ .../clusters/delete_connect_cluster.py | 59 ++++++ .../connect/clusters/get_connect_cluster.py | 55 ++++++ .../connect/clusters/list_connect_clusters.py | 47 +++++ .../clusters/update_connect_cluster.py | 71 +++++++ 6 files changed, 492 insertions(+) create mode 100644 managedkafka/snippets/connect/clusters/clusters_test.py create mode 100644 managedkafka/snippets/connect/clusters/create_connect_cluster.py create mode 100644 managedkafka/snippets/connect/clusters/delete_connect_cluster.py create mode 100644 managedkafka/snippets/connect/clusters/get_connect_cluster.py create mode 100644 managedkafka/snippets/connect/clusters/list_connect_clusters.py create mode 100644 managedkafka/snippets/connect/clusters/update_connect_cluster.py diff --git a/managedkafka/snippets/connect/clusters/clusters_test.py b/managedkafka/snippets/connect/clusters/clusters_test.py new file mode 100644 index 00000000000..6d8c2b1c69e --- /dev/null +++ b/managedkafka/snippets/connect/clusters/clusters_test.py @@ -0,0 +1,176 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock +from unittest.mock import MagicMock + +from google.api_core.operation import Operation +from google.cloud import managedkafka_v1 +import pytest + +import create_connect_cluster +import delete_connect_cluster +import get_connect_cluster +import list_connect_clusters +import update_connect_cluster + +PROJECT_ID = "test-project-id" +REGION = "us-central1" +KAFKA_CLUSTER_ID = "test-cluster-id" +CONNECT_CLUSTER_ID = "test-connect-cluster-id" + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connect_cluster" +) +def test_create_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + cpu = 3 + memory_bytes = 3221225472 + primary_subnet = "test-subnet" + operation = mock.MagicMock(spec=Operation) + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + operation.result = mock.MagicMock(return_value=connect_cluster) + mock_method.return_value = operation + + create_connect_cluster.create_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + kafka_cluster_id=KAFKA_CLUSTER_ID, + primary_subnet=primary_subnet, + cpu=cpu, + memory_bytes=memory_bytes, + ) + + out, _ = capsys.readouterr() + assert "Created Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.get_connect_cluster" +) +def test_get_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + mock_method.return_value = connect_cluster + + get_connect_cluster.get_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Got Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.update_connect_cluster" +) +def test_update_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + new_memory_bytes = 3221225475 + operation = mock.MagicMock(spec=Operation) + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + connect_cluster.capacity_config.memory_bytes = new_memory_bytes + operation.result = mock.MagicMock(return_value=connect_cluster) + mock_method.return_value = operation + + update_connect_cluster.update_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + memory_bytes=new_memory_bytes, + ) + + out, _ = capsys.readouterr() + assert "Updated Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + assert str(new_memory_bytes) in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.list_connect_clusters" +) +def test_list_connect_clusters( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + + response = [connect_cluster] + mock_method.return_value = response + + list_connect_clusters.list_connect_clusters( + project_id=PROJECT_ID, + region=REGION, + ) + + out, _ = capsys.readouterr() + assert "Got Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.delete_connect_cluster" +) +def test_delete_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + operation = mock.MagicMock(spec=Operation) + mock_method.return_value = operation + + delete_connect_cluster.delete_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Deleted Connect cluster" in out + mock_method.assert_called_once() diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py new file mode 100644 index 00000000000..174e9fc94b5 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -0,0 +1,84 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def create_connect_cluster( + project_id: str, + region: str, + connect_cluster_id: str, + kafka_cluster_id: str, + primary_subnet: str, + cpu: int, + memory_bytes: int, +) -> None: + """ + Create a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + kafka_cluster_id: The ID of the primary Managed Service for Apache Kafka cluster. + primary_subnet: The primary VPC subnet for the Connect cluster workers. The expected format is projects/{project_id}/regions/{region}/subnetworks/{subnet_id}. + cpu: Number of vCPUs to provision for the cluster. The minimum is 3. + memory_bytes: The memory to provision for the cluster in bytes. Must be between 1 GiB * cpu and 8 GiB * cpu. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # [START managedkafka_create_connect_cluster] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient + from google.cloud.managedkafka_v1.types import ConnectCluster, CreateConnectClusterRequest, ConnectNetworkConfig + + # TODO(developer): Update with your values. + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # kafka_cluster_id = "my-kafka-cluster" + # subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" + # cpu = 3 + # memory_bytes = 3221225472 # 3 GiB + + connect_client = ManagedKafkaConnectClient() + kafka_client = managedkafka_v1.ManagedKafkaClient() + + parent = connect_client.common_location_path(project_id, region) + kafka_cluster_path = kafka_client.cluster_path(project_id, region, kafka_cluster_id) + + connect_cluster = ConnectCluster() + connect_cluster.name = connect_client.connect_cluster_path(project_id, region, connect_cluster_id) + connect_cluster.kafka_cluster = kafka_cluster_path + connect_cluster.capacity_config.vcpu_count = cpu + connect_cluster.capacity_config.memory_bytes = memory_bytes + connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)] + + request = CreateConnectClusterRequest( + parent=parent, + connect_cluster_id=connect_cluster_id, + connect_cluster=connect_cluster, + ) + + try: + operation = connect_client.create_connect_cluster(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + # Creating a Connect cluster can take 20-30 minutes. + response = operation.result(timeout=1800) + print("Created Connect cluster:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e.message}") + + # [END managedkafka_create_connect_cluster] \ No newline at end of file diff --git a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py new file mode 100644 index 00000000000..642a6cdac20 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py @@ -0,0 +1,59 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def delete_connect_cluster( + project_id: str, + region: str, + connect_cluster_id: str, +) -> None: + """ + Delete a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # [START managedkafka_delete_connect_cluster] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + + client = ManagedKafkaConnectClient() + + request = managedkafka_v1.DeleteConnectClusterRequest( + name=client.connect_cluster_path(project_id, region, connect_cluster_id), + ) + + try: + operation = client.delete_connect_cluster(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + operation.result() + print("Deleted Connect cluster") + except GoogleAPICallError as e: + print(f"The operation failed with error: {e.message}") + + # [END managedkafka_delete_connect_cluster] diff --git a/managedkafka/snippets/connect/clusters/get_connect_cluster.py b/managedkafka/snippets/connect/clusters/get_connect_cluster.py new file mode 100644 index 00000000000..cd8adefbf72 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/get_connect_cluster.py @@ -0,0 +1,55 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def get_connect_cluster( + project_id: str, + region: str, + connect_cluster_id: str, +) -> None: + """ + Get a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + + Raises: + This method will raise the NotFound exception if the Connect cluster is not found. + """ + # [START managedkafka_get_connect_cluster] + from google.api_core.exceptions import NotFound + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + + client = ManagedKafkaConnectClient() + + cluster_path = client.connect_cluster_path(project_id, region, connect_cluster_id) + request = managedkafka_v1.GetConnectClusterRequest( + name=cluster_path, + ) + + try: + cluster = client.get_connect_cluster(request=request) + print("Got Connect cluster:", cluster) + except NotFound as e: + print(f"Failed to get Connect cluster {connect_cluster_id} with error: {e.message}") + + # [END managedkafka_get_connect_cluster] diff --git a/managedkafka/snippets/connect/clusters/list_connect_clusters.py b/managedkafka/snippets/connect/clusters/list_connect_clusters.py new file mode 100644 index 00000000000..049c23b3cdc --- /dev/null +++ b/managedkafka/snippets/connect/clusters/list_connect_clusters.py @@ -0,0 +1,47 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def list_connect_clusters( + project_id: str, + region: str, +) -> None: + """ + List Kafka Connect clusters in a given project ID and region. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + """ + # [START managedkafka_list_connect_clusters] + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + + connect_client = ManagedKafkaConnectClient() + + request = managedkafka_v1.ListConnectClustersRequest( + parent=connect_client.common_location_path(project_id, region), + ) + + response = connect_client.list_connect_clusters(request=request) + for cluster in response: + print("Got Connect cluster:", cluster) + + # [END managedkafka_list_connect_clusters] diff --git a/managedkafka/snippets/connect/clusters/update_connect_cluster.py b/managedkafka/snippets/connect/clusters/update_connect_cluster.py new file mode 100644 index 00000000000..722136956e3 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/update_connect_cluster.py @@ -0,0 +1,71 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def update_connect_cluster( + project_id: str, region: str, connect_cluster_id: str, memory_bytes: int +) -> None: + """ + Update a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + memory_bytes: The memory to provision for the cluster in bytes. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # [START managedkafka_update_connect_cluster] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import ConnectCluster + from google.protobuf import field_mask_pb2 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # memory_bytes = 4295000000 + + connect_client = ManagedKafkaConnectClient() + + connect_cluster = ConnectCluster() + connect_cluster.name = connect_client.connect_cluster_path( + project_id, region, connect_cluster_id + ) + connect_cluster.capacity_config.memory_bytes = memory_bytes + update_mask = field_mask_pb2.FieldMask() + update_mask.paths.append("capacity_config.memory_bytes") + + # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/create-connect-cluster#properties. + request = managedkafka_v1.UpdateConnectClusterRequest( + update_mask=update_mask, + connect_cluster=connect_cluster, + ) + + try: + operation = connect_client.update_connect_cluster(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Updated Connect cluster:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e.message}") + + # [END managedkafka_update_connect_cluster] From 8f2224a4c7104a3eab01faf7ada6c04851799bf4 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 24 Jul 2025 00:42:16 +0000 Subject: [PATCH 02/19] Update google-cloud-managedkafka version to 0.1.12 --- managedkafka/snippets/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/requirements.txt b/managedkafka/snippets/requirements.txt index a7da4ff6516..5f372e81c41 100644 --- a/managedkafka/snippets/requirements.txt +++ b/managedkafka/snippets/requirements.txt @@ -2,5 +2,5 @@ protobuf==5.29.4 pytest==8.2.2 google-api-core==2.23.0 google-auth==2.38.0 -google-cloud-managedkafka==0.1.5 +google-cloud-managedkafka==0.1.12 googleapis-common-protos==1.66.0 From 1feb3544a25608ba003dede93ccf1ffa4db967a1 Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:50:00 -0400 Subject: [PATCH 03/19] Update managedkafka/snippets/connect/clusters/create_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../snippets/connect/clusters/create_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 174e9fc94b5..1d3cf0a9284 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -49,7 +49,7 @@ def create_connect_cluster( # region = "us-central1" # connect_cluster_id = "my-connect-cluster" # kafka_cluster_id = "my-kafka-cluster" - # subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" + # primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" # cpu = 3 # memory_bytes = 3221225472 # 3 GiB From 5840336f20298fce760ca3803048b2b423a9551a Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:50:24 -0400 Subject: [PATCH 04/19] Update managedkafka/snippets/connect/clusters/create_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../snippets/connect/clusters/create_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 1d3cf0a9284..004ed8610bd 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -79,6 +79,6 @@ def create_connect_cluster( response = operation.result(timeout=1800) print("Created Connect cluster:", response) except GoogleAPICallError as e: - print(f"The operation failed with error: {e.message}") + print(f"The operation failed with error: {e}") # [END managedkafka_create_connect_cluster] \ No newline at end of file From d0df10fa42b6e6048b173cf88a3de524515c4d47 Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:50:35 -0400 Subject: [PATCH 05/19] Update managedkafka/snippets/connect/clusters/delete_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../snippets/connect/clusters/delete_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py index 642a6cdac20..f05bc1fbdcf 100644 --- a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py @@ -54,6 +54,6 @@ def delete_connect_cluster( operation.result() print("Deleted Connect cluster") except GoogleAPICallError as e: - print(f"The operation failed with error: {e.message}") + print(f"The operation failed with error: {e}") # [END managedkafka_delete_connect_cluster] From 505759a4cf37c1359b6217bbc87cad629fff5923 Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:50:42 -0400 Subject: [PATCH 06/19] Update managedkafka/snippets/connect/clusters/get_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- managedkafka/snippets/connect/clusters/get_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/get_connect_cluster.py b/managedkafka/snippets/connect/clusters/get_connect_cluster.py index cd8adefbf72..8dfd39b5958 100644 --- a/managedkafka/snippets/connect/clusters/get_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/get_connect_cluster.py @@ -50,6 +50,6 @@ def get_connect_cluster( cluster = client.get_connect_cluster(request=request) print("Got Connect cluster:", cluster) except NotFound as e: - print(f"Failed to get Connect cluster {connect_cluster_id} with error: {e.message}") + print(f"Failed to get Connect cluster {connect_cluster_id} with error: {e}") # [END managedkafka_get_connect_cluster] From d3fba2c9784ad8f49e9b8941bbf8101515de8e65 Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:51:04 -0400 Subject: [PATCH 07/19] Update managedkafka/snippets/connect/clusters/list_connect_clusters.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../snippets/connect/clusters/list_connect_clusters.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/list_connect_clusters.py b/managedkafka/snippets/connect/clusters/list_connect_clusters.py index 049c23b3cdc..615626017db 100644 --- a/managedkafka/snippets/connect/clusters/list_connect_clusters.py +++ b/managedkafka/snippets/connect/clusters/list_connect_clusters.py @@ -42,6 +42,11 @@ def list_connect_clusters( response = connect_client.list_connect_clusters(request=request) for cluster in response: - print("Got Connect cluster:", cluster) + try: + response = connect_client.list_connect_clusters(request=request) + for cluster in response: + print("Got Connect cluster:", cluster) + except GoogleAPICallError as e: + print(f"Failed to list Connect clusters with error: {e}") # [END managedkafka_list_connect_clusters] From 0a85a8bf9d725d7c7a0b310b5f6032110e558f4b Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:51:16 -0400 Subject: [PATCH 08/19] Update managedkafka/snippets/connect/clusters/update_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../snippets/connect/clusters/update_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/update_connect_cluster.py b/managedkafka/snippets/connect/clusters/update_connect_cluster.py index 722136956e3..36ec1c5ec06 100644 --- a/managedkafka/snippets/connect/clusters/update_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/update_connect_cluster.py @@ -66,6 +66,6 @@ def update_connect_cluster( response = operation.result() print("Updated Connect cluster:", response) except GoogleAPICallError as e: - print(f"The operation failed with error: {e.message}") + print(f"The operation failed with error: {e}") # [END managedkafka_update_connect_cluster] From dfa07d2ac8ccb59e61878e154a4ba0d8f7f859b8 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 24 Jul 2025 01:00:43 +0000 Subject: [PATCH 09/19] Add timeouts and improve error handling. --- .../snippets/connect/clusters/create_connect_cluster.py | 7 ++++--- .../snippets/connect/clusters/delete_connect_cluster.py | 3 ++- .../snippets/connect/clusters/list_connect_clusters.py | 9 ++++----- .../snippets/connect/clusters/update_connect_cluster.py | 2 ++ 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 004ed8610bd..6dfc218e684 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -75,10 +75,11 @@ def create_connect_cluster( try: operation = connect_client.create_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") - # Creating a Connect cluster can take 20-30 minutes. - response = operation.result(timeout=1800) + # Creating a Connect cluster can take 10-40 minutes. + response = operation.result(timeout=3000) print("Created Connect cluster:", response) except GoogleAPICallError as e: print(f"The operation failed with error: {e}") - # [END managedkafka_create_connect_cluster] \ No newline at end of file + # [END managedkafka_create_connect_cluster] + \ No newline at end of file diff --git a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py index f05bc1fbdcf..b044fd2b047 100644 --- a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py @@ -51,7 +51,8 @@ def delete_connect_cluster( try: operation = client.delete_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") - operation.result() + # Deleting a Connect cluster can take 10-40 minutes. + operation.result(timeout=3000) print("Deleted Connect cluster") except GoogleAPICallError as e: print(f"The operation failed with error: {e}") diff --git a/managedkafka/snippets/connect/clusters/list_connect_clusters.py b/managedkafka/snippets/connect/clusters/list_connect_clusters.py index 615626017db..749a5267d91 100644 --- a/managedkafka/snippets/connect/clusters/list_connect_clusters.py +++ b/managedkafka/snippets/connect/clusters/list_connect_clusters.py @@ -29,6 +29,7 @@ def list_connect_clusters( from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( ManagedKafkaConnectClient, ) + from google.api_core.exceptions import GoogleAPICallError # TODO(developer) # project_id = "my-project-id" @@ -42,11 +43,9 @@ def list_connect_clusters( response = connect_client.list_connect_clusters(request=request) for cluster in response: - try: - response = connect_client.list_connect_clusters(request=request) - for cluster in response: + try: print("Got Connect cluster:", cluster) - except GoogleAPICallError as e: - print(f"Failed to list Connect clusters with error: {e}") + except GoogleAPICallError as e: + print(f"Failed to list Connect clusters with error: {e}") # [END managedkafka_list_connect_clusters] diff --git a/managedkafka/snippets/connect/clusters/update_connect_cluster.py b/managedkafka/snippets/connect/clusters/update_connect_cluster.py index 36ec1c5ec06..f7ed0d0248e 100644 --- a/managedkafka/snippets/connect/clusters/update_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/update_connect_cluster.py @@ -63,6 +63,8 @@ def update_connect_cluster( try: operation = connect_client.update_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") + # Updating a Connect cluster can take 10-40 minutes. + operation.result(timeout=3000) response = operation.result() print("Updated Connect cluster:", response) except GoogleAPICallError as e: From ab697bfab4b1f26ff4ed5e0d9226fafe9800d889 Mon Sep 17 00:00:00 2001 From: salmany Date: Mon, 28 Jul 2025 19:44:31 +0000 Subject: [PATCH 10/19] Addressed PR comments. --- .../snippets/connect/clusters/clusters_test.py | 6 +++--- .../connect/clusters/create_connect_cluster.py | 10 +++++++++- .../connect/clusters/delete_connect_cluster.py | 6 ++---- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/managedkafka/snippets/connect/clusters/clusters_test.py b/managedkafka/snippets/connect/clusters/clusters_test.py index 6d8c2b1c69e..26fea584393 100644 --- a/managedkafka/snippets/connect/clusters/clusters_test.py +++ b/managedkafka/snippets/connect/clusters/clusters_test.py @@ -38,8 +38,8 @@ def test_create_connect_cluster( mock_method: MagicMock, capsys: pytest.CaptureFixture[str], ) -> None: - cpu = 3 - memory_bytes = 3221225472 + cpu = 12 + memory_bytes = 12884901900 # 12 GB primary_subnet = "test-subnet" operation = mock.MagicMock(spec=Operation) connect_cluster = managedkafka_v1.types.ConnectCluster() @@ -101,7 +101,7 @@ def test_update_connect_cluster( mock_method: MagicMock, capsys: pytest.CaptureFixture[str], ) -> None: - new_memory_bytes = 3221225475 + new_memory_bytes = 12884901900 # 12 GB operation = mock.MagicMock(spec=Operation) connect_cluster = managedkafka_v1.types.ConnectCluster() connect_cluster.name = ( diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 6dfc218e684..3187841e86a 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -65,6 +65,15 @@ def create_connect_cluster( connect_cluster.capacity_config.vcpu_count = cpu connect_cluster.capacity_config.memory_bytes = memory_bytes connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)] + # Optionally, you can also specify accessible subnets and resolvable DNS domains as part of your network configuration. + # For example: + # connect_cluster.gcp_config.access_config.network_configs = [ + # ConnectNetworkConfig( + # primary_subnet=primary_subnet, + # additional_subnets=additional_subnets, + # dns_domain_names=dns_domain_names, + # ) + # ] request = CreateConnectClusterRequest( parent=parent, @@ -82,4 +91,3 @@ def create_connect_cluster( print(f"The operation failed with error: {e}") # [END managedkafka_create_connect_cluster] - \ No newline at end of file diff --git a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py index b044fd2b047..84258fe830f 100644 --- a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py @@ -27,8 +27,7 @@ def delete_connect_cluster( connect_cluster_id: ID of the Kafka Connect cluster. Raises: - This method will raise the GoogleAPICallError exception if the operation errors or - the timeout before the operation completes is reached. + This method will raise the GoogleAPICallError exception if the operation errors. """ # [START managedkafka_delete_connect_cluster] from google.api_core.exceptions import GoogleAPICallError @@ -51,8 +50,7 @@ def delete_connect_cluster( try: operation = client.delete_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") - # Deleting a Connect cluster can take 10-40 minutes. - operation.result(timeout=3000) + operation.result() print("Deleted Connect cluster") except GoogleAPICallError as e: print(f"The operation failed with error: {e}") From 8af6e36fa44855b7fe7e76192d893a69ab9b0330 Mon Sep 17 00:00:00 2001 From: salmany Date: Tue, 29 Jul 2025 00:26:47 +0000 Subject: [PATCH 11/19] Add Managed Kafka Connect Connectors examples This commit adds examples for creating Managed Kafka Connect connectors: * MirrorMaker connector * BigQuery sink connector * Cloud Storage sink connector * Pub/Sub sink connector * Pub/Sub source connector --- .../connect/connectors/connectors_test.py | 188 ++++++++++++++++++ .../create_bigquery_sink_connector.py | 83 ++++++++ .../create_cloud_storage_sink_connector.py | 87 ++++++++ .../create_mirrormaker_connector.py | 112 +++++++++++ .../create_pubsub_sink_connector.py | 82 ++++++++ .../create_pubsub_source_connector.py | 82 ++++++++ .../connect/connectors/requirements.txt | 6 + 7 files changed, 640 insertions(+) create mode 100644 managedkafka/snippets/connect/connectors/connectors_test.py create mode 100644 managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py create mode 100644 managedkafka/snippets/connect/connectors/create_cloud_storage_sink_connector.py create mode 100644 managedkafka/snippets/connect/connectors/create_mirrormaker_connector.py create mode 100644 managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py create mode 100644 managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py create mode 100644 managedkafka/snippets/connect/connectors/requirements.txt diff --git a/managedkafka/snippets/connect/connectors/connectors_test.py b/managedkafka/snippets/connect/connectors/connectors_test.py new file mode 100644 index 00000000000..d8d9690f1c3 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/connectors_test.py @@ -0,0 +1,188 @@ +from unittest import mock +from unittest.mock import MagicMock +from google.api_core.operation import Operation +from google.cloud import managedkafka_v1 +import pytest + +import create_bigquery_sink_connector +import create_cloud_storage_sink_connector +import create_mirrormaker_connector +import create_pubsub_sink_connector +import create_pubsub_source_connector + +PROJECT_ID = "test-project-id" +REGION = "us-central1" +CONNECT_CLUSTER_ID = "test-connect-cluster-id" + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector" +) +def test_create_mirrormaker2_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector_id = "MM2_CONNECTOR_ID" + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = connector_id + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + create_mirrormaker_connector.create_mirrormaker_connector( + PROJECT_ID, + REGION, + CONNECT_CLUSTER_ID, + connector_id, + "GMK_TOPIC_NAME", + "source", + "target", + "GMK_SOURCE_CLUSTER_DNS", + "GMK_TARGET_CLUSTER_DNS", + "1", + "SASL_SSL", + "OAUTHBEARER", + "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler", + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;", + "SASL_SSL", + "OAUTHBEARER", + "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler", + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;", + ) + + out, _ = capsys.readouterr() + assert "Created Connector" in out + assert connector_id in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector" +) +def test_create_pubsub_source_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector_id = "CPS_SOURCE_CONNECTOR_ID" + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = connector_id + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + create_pubsub_source_connector.create_pubsub_source_connector( + PROJECT_ID, + REGION, + CONNECT_CLUSTER_ID, + connector_id, + "GMK_TOPIC_ID", + "CPS_SUBSCRIPTION_ID", + "GCP_PROJECT_ID", + "1", + "org.apache.kafka.connect.converters.ByteArrayConverter", + "org.apache.kafka.connect.storage.StringConverter", + ) + + out, _ = capsys.readouterr() + assert "Created Connector" in out + assert connector_id in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector" +) +def test_create_pubsub_sink_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector_id = "CPS_SINK_CONNECTOR_ID" + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = connector_id + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + create_pubsub_sink_connector.create_pubsub_sink_connector( + PROJECT_ID, + REGION, + CONNECT_CLUSTER_ID, + connector_id, + "GMK_TOPIC_ID", + "org.apache.kafka.connect.storage.StringConverter", + "org.apache.kafka.connect.storage.StringConverter", + "CPS_TOPIC_ID", + "GCP_PROJECT_ID", + "1", + ) + + out, _ = capsys.readouterr() + assert "Created Connector" in out + assert connector_id in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector" +) +def test_create_cloud_storage_sink_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector_id = "GCS_SINK_CONNECTOR_ID" + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = connector_id + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + create_cloud_storage_sink_connector.create_cloud_storage_sink_connector( + PROJECT_ID, + REGION, + CONNECT_CLUSTER_ID, + connector_id, + "GMK_TOPIC_ID", + "GCS_BUCKET_NAME", + "1", + "json", + "org.apache.kafka.connect.json.JsonConverter", + "false", + "org.apache.kafka.connect.storage.StringConverter", + ) + + out, _ = capsys.readouterr() + assert "Created Connector" in out + assert connector_id + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector" +) +def test_create_bigquery_sink_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector_id = "BQ_SINK_CONNECTOR_ID" + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = connector_id + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + create_bigquery_sink_connector.create_bigquery_sink_connector( + PROJECT_ID, + REGION, + CONNECT_CLUSTER_ID, + connector_id, + "GMK_TOPIC_ID", + "3", + "org.apache.kafka.connect.storage.StringConverter", + "org.apache.kafka.connect.json.JsonConverter", + "false", + "BQ_DATASET_ID", + ) + + out, _ = capsys.readouterr() + assert "Created Connector" in out + assert connector_id in out + mock_method.assert_called_once() diff --git a/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py b/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py new file mode 100644 index 00000000000..18cd5d186ba --- /dev/null +++ b/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py @@ -0,0 +1,83 @@ +def create_bigquery_sink_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + topics: str, + tasks_max: str, + key_converter: str, + value_converter: str, + value_converter_schemas_enable: str, + default_dataset: str, +) -> None: + """ + Create a BigQuery Sink connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: Name of the connector. + topics: Kafka topics to read from. + tasks_max: Maximum number of tasks. + key_converter: Key converter class. + value_converter: Value converter class. + value_converter_schemas_enable: Enable schemas for value converter. + default_dataset: BigQuery dataset ID. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # TODO(developer): Update with your config values. Here is a sample configuration: + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "BQ_SINK_CONNECTOR_ID" + # topics = "GMK_TOPIC_ID" + # tasks_max = "3" + # key_converter = "org.apache.kafka.connect.storage.StringConverter" + # value_converter = "org.apache.kafka.connect.json.JsonConverter" + # value_converter_schemas_enable = "false" + # default_dataset = "BQ_DATASET_ID" + + # [START managedkafka_create_bigquery_sink_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest + + connect_client = ManagedKafkaConnectClient() + parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id) + + configs = { + "name": connector_id, + "project": project_id, + "topics": topics, + "tasks.max": tasks_max, + "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", + "key.converter": key_converter, + "value.converter": value_converter, + "value.converter.schemas.enable": value_converter_schemas_enable, + "defaultDataset": default_dataset, + } + + connector = Connector() + connector.name = connector_id + connector.configs = configs + + request = CreateConnectorRequest( + parent=parent, + connector_id=connector_id, + connector=connector, + ) + + try: + operation = connect_client.create_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Created Connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + # [END managedkafka_create_bigquery_sink_connector] diff --git a/managedkafka/snippets/connect/connectors/create_cloud_storage_sink_connector.py b/managedkafka/snippets/connect/connectors/create_cloud_storage_sink_connector.py new file mode 100644 index 00000000000..2520588afec --- /dev/null +++ b/managedkafka/snippets/connect/connectors/create_cloud_storage_sink_connector.py @@ -0,0 +1,87 @@ +def create_cloud_storage_sink_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + topics: str, + gcs_bucket_name: str, + tasks_max: str, + format_output_type: str, + value_converter: str, + value_converter_schemas_enable: str, + key_converter: str, +) -> None: + """ + Create a Cloud Storage Sink connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: Name of the connector. + topics: Kafka topics to read from. + gcs_bucket_name: Google Cloud Storage bucket name. + tasks_max: Maximum number of tasks. + format_output_type: Output format type. + value_converter: Value converter class. + value_converter_schemas_enable: Enable schemas for value converter. + key_converter: Key converter class. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # TODO(developer): Update with your config values. Here is a sample configuration: + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "GCS_SINK_CONNECTOR_ID" + # topics = "GMK_TOPIC_ID" + # gcs_bucket_name = "GCS_BUCKET_NAME" + # tasks_max = "1" + # format_output_type = "json" + # value_converter = "org.apache.kafka.connect.json.JsonConverter" + # value_converter_schemas_enable = "false" + # key_converter = "org.apache.kafka.connect.storage.StringConverter" + + # [START managedkafka_create_cloud_storage_sink_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest + + connect_client = ManagedKafkaConnectClient() + parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id) + + configs = { + "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector", + "tasks.max": tasks_max, + "topics": topics, + "gcs.bucket.name": gcs_bucket_name, + "gcs.credentials.default": "true", + "format.output.type": format_output_type, + "name": connector_id, + "value.converter": value_converter, + "value.converter.schemas.enable": value_converter_schemas_enable, + "key.converter": key_converter, + } + + connector = Connector() + connector.name = connector_id + connector.configs = configs + + request = CreateConnectorRequest( + parent=parent, + connector_id=connector_id, + connector=connector, + ) + + try: + operation = connect_client.create_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Created Connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + # [END managedkafka_create_cloud_storage_sink_connector] diff --git a/managedkafka/snippets/connect/connectors/create_mirrormaker_connector.py b/managedkafka/snippets/connect/connectors/create_mirrormaker_connector.py new file mode 100644 index 00000000000..a3e10e1c7c4 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/create_mirrormaker_connector.py @@ -0,0 +1,112 @@ +def create_mirrormaker_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + topics: str, + source_cluster_alias: str, + target_cluster_alias: str, + source_bootstrap_servers: str, + target_bootstrap_servers: str, + offset_syncs_topic_replication_factor: str, + source_cluster_security_protocol: str, + source_cluster_sasl_mechanism: str, + source_cluster_sasl_login_callback_handler_class: str, + source_cluster_sasl_jaas_config: str, + target_cluster_security_protocol: str, + target_cluster_sasl_mechanism: str, + target_cluster_sasl_login_callback_handler_class: str, + target_cluster_sasl_jaas_config: str, +) -> None: + """ + Create a MirrorMaker 2.0 connector with SASL/OAUTHBEARER security. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_name: Name of the connector. + topics: Topics to mirror. + source_cluster_alias: Alias for the source cluster. + target_cluster_alias: Alias for the target cluster. + source_bootstrap_servers: Source cluster bootstrap servers. + target_bootstrap_servers: Target cluster bootstrap servers. + offset_syncs_topic_replication_factor: Replication factor for offset-syncs topic. + source_cluster_security_protocol: Security protocol for source cluster. + source_cluster_sasl_mechanism: SASL mechanism for source cluster. + source_cluster_sasl_login_callback_handler_class: SASL login callback handler class for source cluster. + source_cluster_sasl_jaas_config: SASL JAAS config for source cluster. + target_cluster_security_protocol: Security protocol for target cluster. + target_cluster_sasl_mechanism: SASL mechanism for target cluster. + target_cluster_sasl_login_callback_handler_class: SASL login callback handler class for target cluster. + target_cluster_sasl_jaas_config: SASL JAAS config for target cluster. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # TODO(developer): Update with your config values. Here is a sample configuration: + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "MM2_CONNECTOR_ID" + # topics = "GMK_TOPIC_NAME" + # source_cluster_alias = "source" + # target_cluster_alias = "target" + # source_bootstrap_servers = "GMK_SOURCE_CLUSTER_DNS" + # target_bootstrap_servers = "GMK_TARGET_CLUSTER_DNS" + # offset_syncs_topic_replication_factor = "1" + # source_cluster_security_protocol = "SASL_SSL" + # source_cluster_sasl_mechanism = "OAUTHBEARER" + # source_cluster_sasl_login_callback_handler_class = "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler" + # source_cluster_sasl_jaas_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" + # target_cluster_security_protocol = "SASL_SSL" + # target_cluster_sasl_mechanism = "OAUTHBEARER" + # target_cluster_sasl_login_callback_handler_class = "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler" + # target_cluster_sasl_jaas_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" + + # [START managedkafka_create_mirrormaker2_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest + + connect_client = ManagedKafkaConnectClient() + parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id) + + configs = { + "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", + "source.cluster.alias": source_cluster_alias, + "target.cluster.alias": target_cluster_alias, + "topics": topics, + "source.cluster.bootstrap.servers": source_bootstrap_servers, + "target.cluster.bootstrap.servers": target_bootstrap_servers, + "offset-syncs.topic.replication.factor": offset_syncs_topic_replication_factor, + "source.cluster.security.protocol": source_cluster_security_protocol, + "source.cluster.sasl.mechanism": source_cluster_sasl_mechanism, + "source.cluster.sasl.login.callback.handler.class": source_cluster_sasl_login_callback_handler_class, + "source.cluster.sasl.jaas.config": source_cluster_sasl_jaas_config, + "target.cluster.security.protocol": target_cluster_security_protocol, + "target.cluster.sasl.mechanism": target_cluster_sasl_mechanism, + "target.cluster.sasl.login.callback.handler.class": target_cluster_sasl_login_callback_handler_class, + "target.cluster.sasl.jaas.config": target_cluster_sasl_jaas_config, + } + + connector = Connector() + connector.name = connector_id + connector.configs = configs + + request = CreateConnectorRequest( + parent=parent, + connector_id=connector_id, + connector=connector, + ) + + try: + operation = connect_client.create_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Created Connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + # [END managedkafka_create_mirrormaker2_connector] diff --git a/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py b/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py new file mode 100644 index 00000000000..ff542310406 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py @@ -0,0 +1,82 @@ +def create_pubsub_sink_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + topics: str, + value_converter: str, + key_converter: str, + cps_topic: str, + cps_project: str, + tasks_max: str, +) -> None: + """ + Create a Pub/Sub Sink connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: Name of the connector. + topics: Kafka topics to read from. + value_converter: Value converter class. + key_converter: Key converter class. + cps_topic: Cloud Pub/Sub topic ID. + cps_project: Cloud Pub/Sub project ID. + tasks_max: Maximum number of tasks. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # TODO(developer): Update with your config values. Here is a sample configuration: + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "CPS_SINK_CONNECTOR_ID" + # topics = "GMK_TOPIC_ID" + # value_converter = "org.apache.kafka.connect.storage.StringConverter" + # key_converter = "org.apache.kafka.connect.storage.StringConverter" + # cps_topic = "CPS_TOPIC_ID" + # cps_project = "GCP_PROJECT_ID" + # tasks_max = "1" + + # [START managedkafka_create_pubsub_sink_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest + + connect_client = ManagedKafkaConnectClient() + parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id) + + configs = { + "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector", + "name": connector_id, + "tasks.max": tasks_max, + "topics": topics, + "value.converter": value_converter, + "key.converter": key_converter, + "cps.topic": cps_topic, + "cps.project": cps_project, + } + + connector = Connector() + connector.name = connector_id + connector.configs = configs + + request = CreateConnectorRequest( + parent=parent, + connector_id=connector_id, + connector=connector, + ) + + try: + operation = connect_client.create_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Created Connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + # [END managedkafka_create_pubsub_sink_connector] diff --git a/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py b/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py new file mode 100644 index 00000000000..cfec9aca724 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py @@ -0,0 +1,82 @@ +def create_pubsub_source_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + kafka_topic: str, + cps_subscription: str, + cps_project: str, + tasks_max: str, + value_converter: str, + key_converter: str, +) -> None: + """ + Create a Pub/Sub Source connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: Name of the connector. + kafka_topic: Kafka topic to publish to. + cps_subscription: Cloud Pub/Sub subscription ID. + cps_project: Cloud Pub/Sub project ID. + tasks_max: Maximum number of tasks. + value_converter: Value converter class. + key_converter: Key converter class. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # TODO(developer): Update with your config values. Here is a sample configuration: + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "CPS_SOURCE_CONNECTOR_ID" + # kafka_topic = "GMK_TOPIC_ID" + # cps_subscription = "CPS_SUBSCRIPTION_ID" + # cps_project = "GCP_PROJECT_ID" + # tasks_max = "1" + # value_converter = "org.apache.kafka.connect.converters.ByteArrayConverter" + # key_converter = "org.apache.kafka.connect.storage.StringConverter" + + # [START managedkafka_create_pubsub_source_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest + + connect_client = ManagedKafkaConnectClient() + parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id) + + configs = { + "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector", + "name": connector_id, + "tasks.max": tasks_max, + "kafka.topic": kafka_topic, + "cps.subscription": cps_subscription, + "cps.project": cps_project, + "value.converter": value_converter, + "key.converter": key_converter, + } + + connector = Connector() + connector.name = connector_id + connector.configs = configs + + request = CreateConnectorRequest( + parent=parent, + connector_id=connector_id, + connector=connector, + ) + + try: + operation = connect_client.create_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Created Connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + # [END managedkafka_create_pubsub_source_connector] diff --git a/managedkafka/snippets/connect/connectors/requirements.txt b/managedkafka/snippets/connect/connectors/requirements.txt new file mode 100644 index 00000000000..5f372e81c41 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/requirements.txt @@ -0,0 +1,6 @@ +protobuf==5.29.4 +pytest==8.2.2 +google-api-core==2.23.0 +google-auth==2.38.0 +google-cloud-managedkafka==0.1.12 +googleapis-common-protos==1.66.0 From bc121bdb2ee68919de6e939d418d8ed2fdb19f64 Mon Sep 17 00:00:00 2001 From: salmany Date: Tue, 29 Jul 2025 00:49:46 +0000 Subject: [PATCH 12/19] Fix import statements and add headers. --- .../connect/connectors/connectors_test.py | 15 +++++++++++++++ .../connectors/create_bigquery_sink_connector.py | 14 ++++++++++++++ .../create_cloud_storage_sink_connector.py | 14 ++++++++++++++ .../connectors/create_mirrormaker_connector.py | 14 ++++++++++++++ .../connectors/create_pubsub_sink_connector.py | 14 ++++++++++++++ .../connectors/create_pubsub_source_connector.py | 14 ++++++++++++++ 6 files changed, 85 insertions(+) diff --git a/managedkafka/snippets/connect/connectors/connectors_test.py b/managedkafka/snippets/connect/connectors/connectors_test.py index d8d9690f1c3..12ed09805c1 100644 --- a/managedkafka/snippets/connect/connectors/connectors_test.py +++ b/managedkafka/snippets/connect/connectors/connectors_test.py @@ -1,5 +1,20 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from unittest import mock from unittest.mock import MagicMock + from google.api_core.operation import Operation from google.cloud import managedkafka_v1 import pytest diff --git a/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py b/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py index 18cd5d186ba..b6719e82ea8 100644 --- a/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py +++ b/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py @@ -1,3 +1,17 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + def create_bigquery_sink_connector( project_id: str, region: str, diff --git a/managedkafka/snippets/connect/connectors/create_cloud_storage_sink_connector.py b/managedkafka/snippets/connect/connectors/create_cloud_storage_sink_connector.py index 2520588afec..f46dd274571 100644 --- a/managedkafka/snippets/connect/connectors/create_cloud_storage_sink_connector.py +++ b/managedkafka/snippets/connect/connectors/create_cloud_storage_sink_connector.py @@ -1,3 +1,17 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + def create_cloud_storage_sink_connector( project_id: str, region: str, diff --git a/managedkafka/snippets/connect/connectors/create_mirrormaker_connector.py b/managedkafka/snippets/connect/connectors/create_mirrormaker_connector.py index a3e10e1c7c4..21dee00404f 100644 --- a/managedkafka/snippets/connect/connectors/create_mirrormaker_connector.py +++ b/managedkafka/snippets/connect/connectors/create_mirrormaker_connector.py @@ -1,3 +1,17 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + def create_mirrormaker_connector( project_id: str, region: str, diff --git a/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py b/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py index ff542310406..f2c6b247add 100644 --- a/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py +++ b/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py @@ -1,3 +1,17 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + def create_pubsub_sink_connector( project_id: str, region: str, diff --git a/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py b/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py index cfec9aca724..62c871aaae5 100644 --- a/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py +++ b/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py @@ -1,3 +1,17 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + def create_pubsub_source_connector( project_id: str, region: str, From b9a15efb09d55ee5a40e22717346842fdc73de98 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 7 Aug 2025 22:25:19 +0000 Subject: [PATCH 13/19] Fix sample configs and update MM2 connector. --- .../connect/connectors/connectors_test.py | 20 +++----- .../create_cloud_storage_sink_connector.py | 2 +- ...=> create_mirrormaker_source_connector.py} | 50 ++++--------------- .../create_pubsub_sink_connector.py | 2 +- .../create_pubsub_source_connector.py | 2 +- .../connect/connectors/noxfile_config.py | 42 ++++++++++++++++ 6 files changed, 60 insertions(+), 58 deletions(-) rename managedkafka/snippets/connect/connectors/{create_mirrormaker_connector.py => create_mirrormaker_source_connector.py} (52%) create mode 100644 managedkafka/snippets/connect/connectors/noxfile_config.py diff --git a/managedkafka/snippets/connect/connectors/connectors_test.py b/managedkafka/snippets/connect/connectors/connectors_test.py index 12ed09805c1..7e3101af7f5 100644 --- a/managedkafka/snippets/connect/connectors/connectors_test.py +++ b/managedkafka/snippets/connect/connectors/connectors_test.py @@ -21,7 +21,7 @@ import create_bigquery_sink_connector import create_cloud_storage_sink_connector -import create_mirrormaker_connector +import create_mirrormaker_source_connector import create_pubsub_sink_connector import create_pubsub_source_connector @@ -44,25 +44,17 @@ def test_create_mirrormaker2_connector( operation.result = mock.MagicMock(return_value=connector) mock_method.return_value = operation - create_mirrormaker_connector.create_mirrormaker_connector( + create_mirrormaker_source_connector.create_mirrormaker_source_connector( PROJECT_ID, REGION, CONNECT_CLUSTER_ID, + "3", connector_id, "GMK_TOPIC_NAME", "source", "target", "GMK_SOURCE_CLUSTER_DNS", "GMK_TARGET_CLUSTER_DNS", - "1", - "SASL_SSL", - "OAUTHBEARER", - "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler", - "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;", - "SASL_SSL", - "OAUTHBEARER", - "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler", - "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;", ) out, _ = capsys.readouterr() @@ -93,7 +85,7 @@ def test_create_pubsub_source_connector( "GMK_TOPIC_ID", "CPS_SUBSCRIPTION_ID", "GCP_PROJECT_ID", - "1", + "3", "org.apache.kafka.connect.converters.ByteArrayConverter", "org.apache.kafka.connect.storage.StringConverter", ) @@ -128,7 +120,7 @@ def test_create_pubsub_sink_connector( "org.apache.kafka.connect.storage.StringConverter", "CPS_TOPIC_ID", "GCP_PROJECT_ID", - "1", + "3", ) out, _ = capsys.readouterr() @@ -158,7 +150,7 @@ def test_create_cloud_storage_sink_connector( connector_id, "GMK_TOPIC_ID", "GCS_BUCKET_NAME", - "1", + "3", "json", "org.apache.kafka.connect.json.JsonConverter", "false", diff --git a/managedkafka/snippets/connect/connectors/create_cloud_storage_sink_connector.py b/managedkafka/snippets/connect/connectors/create_cloud_storage_sink_connector.py index f46dd274571..8e6d7bc2c70 100644 --- a/managedkafka/snippets/connect/connectors/create_cloud_storage_sink_connector.py +++ b/managedkafka/snippets/connect/connectors/create_cloud_storage_sink_connector.py @@ -52,7 +52,7 @@ def create_cloud_storage_sink_connector( # connector_id = "GCS_SINK_CONNECTOR_ID" # topics = "GMK_TOPIC_ID" # gcs_bucket_name = "GCS_BUCKET_NAME" - # tasks_max = "1" + # tasks_max = "3" # format_output_type = "json" # value_converter = "org.apache.kafka.connect.json.JsonConverter" # value_converter_schemas_enable = "false" diff --git a/managedkafka/snippets/connect/connectors/create_mirrormaker_connector.py b/managedkafka/snippets/connect/connectors/create_mirrormaker_source_connector.py similarity index 52% rename from managedkafka/snippets/connect/connectors/create_mirrormaker_connector.py rename to managedkafka/snippets/connect/connectors/create_mirrormaker_source_connector.py index 21dee00404f..e897cd3c04b 100644 --- a/managedkafka/snippets/connect/connectors/create_mirrormaker_connector.py +++ b/managedkafka/snippets/connect/connectors/create_mirrormaker_source_connector.py @@ -12,48 +12,32 @@ # See the License for the specific language governing permissions and # limitations under the License. -def create_mirrormaker_connector( +def create_mirrormaker_source_connector( project_id: str, region: str, connect_cluster_id: str, + tasks_max: str, connector_id: str, topics: str, source_cluster_alias: str, target_cluster_alias: str, source_bootstrap_servers: str, target_bootstrap_servers: str, - offset_syncs_topic_replication_factor: str, - source_cluster_security_protocol: str, - source_cluster_sasl_mechanism: str, - source_cluster_sasl_login_callback_handler_class: str, - source_cluster_sasl_jaas_config: str, - target_cluster_security_protocol: str, - target_cluster_sasl_mechanism: str, - target_cluster_sasl_login_callback_handler_class: str, - target_cluster_sasl_jaas_config: str, ) -> None: """ - Create a MirrorMaker 2.0 connector with SASL/OAUTHBEARER security. + Create a MirrorMaker 2.0 Source connector. Args: project_id: Google Cloud project ID. region: Cloud region. connect_cluster_id: ID of the Kafka Connect cluster. + tasks_max: Controls the level of parallelism for the connector. connector_name: Name of the connector. topics: Topics to mirror. source_cluster_alias: Alias for the source cluster. target_cluster_alias: Alias for the target cluster. source_bootstrap_servers: Source cluster bootstrap servers. - target_bootstrap_servers: Target cluster bootstrap servers. - offset_syncs_topic_replication_factor: Replication factor for offset-syncs topic. - source_cluster_security_protocol: Security protocol for source cluster. - source_cluster_sasl_mechanism: SASL mechanism for source cluster. - source_cluster_sasl_login_callback_handler_class: SASL login callback handler class for source cluster. - source_cluster_sasl_jaas_config: SASL JAAS config for source cluster. - target_cluster_security_protocol: Security protocol for target cluster. - target_cluster_sasl_mechanism: SASL mechanism for target cluster. - target_cluster_sasl_login_callback_handler_class: SASL login callback handler class for target cluster. - target_cluster_sasl_jaas_config: SASL JAAS config for target cluster. + target_bootstrap_servers: Target cluster bootstrap servers. This is usually the primary cluster. Raises: This method will raise the GoogleAPICallError exception if the operation errors. @@ -62,23 +46,15 @@ def create_mirrormaker_connector( # project_id = "my-project-id" # region = "us-central1" # connect_cluster_id = "my-connect-cluster" + # tasks_max = "3" # connector_id = "MM2_CONNECTOR_ID" # topics = "GMK_TOPIC_NAME" # source_cluster_alias = "source" # target_cluster_alias = "target" # source_bootstrap_servers = "GMK_SOURCE_CLUSTER_DNS" # target_bootstrap_servers = "GMK_TARGET_CLUSTER_DNS" - # offset_syncs_topic_replication_factor = "1" - # source_cluster_security_protocol = "SASL_SSL" - # source_cluster_sasl_mechanism = "OAUTHBEARER" - # source_cluster_sasl_login_callback_handler_class = "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler" - # source_cluster_sasl_jaas_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" - # target_cluster_security_protocol = "SASL_SSL" - # target_cluster_sasl_mechanism = "OAUTHBEARER" - # target_cluster_sasl_login_callback_handler_class = "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler" - # target_cluster_sasl_jaas_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" - # [START managedkafka_create_mirrormaker2_connector] + # [START managedkafka_create_mirrormaker2_source_connector] from google.api_core.exceptions import GoogleAPICallError from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( ManagedKafkaConnectClient, @@ -92,18 +68,10 @@ def create_mirrormaker_connector( "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "source.cluster.alias": source_cluster_alias, "target.cluster.alias": target_cluster_alias, + "tasks.max": tasks_max, "topics": topics, "source.cluster.bootstrap.servers": source_bootstrap_servers, "target.cluster.bootstrap.servers": target_bootstrap_servers, - "offset-syncs.topic.replication.factor": offset_syncs_topic_replication_factor, - "source.cluster.security.protocol": source_cluster_security_protocol, - "source.cluster.sasl.mechanism": source_cluster_sasl_mechanism, - "source.cluster.sasl.login.callback.handler.class": source_cluster_sasl_login_callback_handler_class, - "source.cluster.sasl.jaas.config": source_cluster_sasl_jaas_config, - "target.cluster.security.protocol": target_cluster_security_protocol, - "target.cluster.sasl.mechanism": target_cluster_sasl_mechanism, - "target.cluster.sasl.login.callback.handler.class": target_cluster_sasl_login_callback_handler_class, - "target.cluster.sasl.jaas.config": target_cluster_sasl_jaas_config, } connector = Connector() @@ -123,4 +91,4 @@ def create_mirrormaker_connector( print("Created Connector:", response) except GoogleAPICallError as e: print(f"The operation failed with error: {e}") - # [END managedkafka_create_mirrormaker2_connector] + # [END managedkafka_create_mirrormaker2_source_connector] diff --git a/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py b/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py index f2c6b247add..2742d8166d5 100644 --- a/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py +++ b/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py @@ -53,7 +53,7 @@ def create_pubsub_sink_connector( # key_converter = "org.apache.kafka.connect.storage.StringConverter" # cps_topic = "CPS_TOPIC_ID" # cps_project = "GCP_PROJECT_ID" - # tasks_max = "1" + # tasks_max = "3" # [START managedkafka_create_pubsub_sink_connector] from google.api_core.exceptions import GoogleAPICallError diff --git a/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py b/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py index 62c871aaae5..d5c2acc7013 100644 --- a/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py +++ b/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py @@ -51,7 +51,7 @@ def create_pubsub_source_connector( # kafka_topic = "GMK_TOPIC_ID" # cps_subscription = "CPS_SUBSCRIPTION_ID" # cps_project = "GCP_PROJECT_ID" - # tasks_max = "1" + # tasks_max = "3" # value_converter = "org.apache.kafka.connect.converters.ByteArrayConverter" # key_converter = "org.apache.kafka.connect.storage.StringConverter" diff --git a/managedkafka/snippets/connect/connectors/noxfile_config.py b/managedkafka/snippets/connect/connectors/noxfile_config.py new file mode 100644 index 00000000000..2a0f115c38f --- /dev/null +++ b/managedkafka/snippets/connect/connectors/noxfile_config.py @@ -0,0 +1,42 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Default TEST_CONFIG_OVERRIDE for python repos. + +# You can copy this file into your directory, then it will be imported from +# the noxfile.py. + +# The source of truth: +# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + "ignored_versions": ["2.7", "3.7", "3.8", "3.10", "3.11", "3.12"], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + "enforce_type_hints": True, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": None, + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": {}, +} From a26f7b9aa4d689cd9dfb3ff3ecefa888e56f26c2 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 7 Aug 2025 22:30:38 +0000 Subject: [PATCH 14/19] Remove unwanted local test artifact. --- .../connect/connectors/noxfile_config.py | 42 ------------------- 1 file changed, 42 deletions(-) delete mode 100644 managedkafka/snippets/connect/connectors/noxfile_config.py diff --git a/managedkafka/snippets/connect/connectors/noxfile_config.py b/managedkafka/snippets/connect/connectors/noxfile_config.py deleted file mode 100644 index 2a0f115c38f..00000000000 --- a/managedkafka/snippets/connect/connectors/noxfile_config.py +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Default TEST_CONFIG_OVERRIDE for python repos. - -# You can copy this file into your directory, then it will be imported from -# the noxfile.py. - -# The source of truth: -# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py - -TEST_CONFIG_OVERRIDE = { - # You can opt out from the test for specific Python versions. - "ignored_versions": ["2.7", "3.7", "3.8", "3.10", "3.11", "3.12"], - # Old samples are opted out of enforcing Python type hints - # All new samples should feature them - "enforce_type_hints": True, - # An envvar key for determining the project id to use. Change it - # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a - # build specific Cloud project. You can also use your own string - # to use your own Cloud project. - "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", - # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', - # If you need to use a specific version of pip, - # change pip_version_override to the string representation - # of the version number, for example, "20.2.4" - "pip_version_override": None, - # A dictionary you want to inject into your test. Don't put any - # secrets here. These values will override predefined values. - "envs": {}, -} From af7a6281500c02e04a732576da31af2d334551d8 Mon Sep 17 00:00:00 2001 From: salmany Date: Tue, 29 Jul 2025 01:00:38 +0000 Subject: [PATCH 15/19] Adds requirements.txt file for samples. As per the [Authoring Guide] (https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#dependencies), each sample is required to have a requirements.txt file that lists the dependencies needed to run the sample. --- managedkafka/snippets/connect/clusters/requirements.txt | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 managedkafka/snippets/connect/clusters/requirements.txt diff --git a/managedkafka/snippets/connect/clusters/requirements.txt b/managedkafka/snippets/connect/clusters/requirements.txt new file mode 100644 index 00000000000..5f372e81c41 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/requirements.txt @@ -0,0 +1,6 @@ +protobuf==5.29.4 +pytest==8.2.2 +google-api-core==2.23.0 +google-auth==2.38.0 +google-cloud-managedkafka==0.1.12 +googleapis-common-protos==1.66.0 From 1ae987f5ba86139696df3c03602a467445d33ca7 Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 30 Jul 2025 17:03:12 -0400 Subject: [PATCH 16/19] Remove timeout in update_connect_cluster.py Remove timeout to align with Managed Kafka Cluster update example: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/cdae4cacfe8f9612e554af11ef72bc8d34765ada/managedkafka/snippets/clusters/update_cluster.py#L60 --- .../snippets/connect/clusters/update_connect_cluster.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/managedkafka/snippets/connect/clusters/update_connect_cluster.py b/managedkafka/snippets/connect/clusters/update_connect_cluster.py index f7ed0d0248e..16587046949 100644 --- a/managedkafka/snippets/connect/clusters/update_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/update_connect_cluster.py @@ -63,8 +63,7 @@ def update_connect_cluster( try: operation = connect_client.update_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") - # Updating a Connect cluster can take 10-40 minutes. - operation.result(timeout=3000) + operation.result() response = operation.result() print("Updated Connect cluster:", response) except GoogleAPICallError as e: From 3e788a6f26bb43053f56003a717b24cb6f8ae98a Mon Sep 17 00:00:00 2001 From: salmany Date: Mon, 4 Aug 2025 18:27:09 +0000 Subject: [PATCH 17/19] Fix CPU and memory values for Cluster creation. --- .../snippets/connect/clusters/create_connect_cluster.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 3187841e86a..f0175f19eff 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -50,8 +50,8 @@ def create_connect_cluster( # connect_cluster_id = "my-connect-cluster" # kafka_cluster_id = "my-kafka-cluster" # primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" - # cpu = 3 - # memory_bytes = 3221225472 # 3 GiB + # cpu = 12 + # memory_bytes = 12884901888 # 12 GiB connect_client = ManagedKafkaConnectClient() kafka_client = managedkafka_v1.ManagedKafkaClient() From e908ecdb51bf5a9222b2db2cbb1217b5787aed0f Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 7 Aug 2025 21:13:25 +0000 Subject: [PATCH 18/19] Fixed comment for minimum vCpu. --- .../snippets/connect/clusters/create_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index f0175f19eff..26a4e926ac4 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -31,7 +31,7 @@ def create_connect_cluster( connect_cluster_id: ID of the Kafka Connect cluster. kafka_cluster_id: The ID of the primary Managed Service for Apache Kafka cluster. primary_subnet: The primary VPC subnet for the Connect cluster workers. The expected format is projects/{project_id}/regions/{region}/subnetworks/{subnet_id}. - cpu: Number of vCPUs to provision for the cluster. The minimum is 3. + cpu: Number of vCPUs to provision for the cluster. The minimum is 12. memory_bytes: The memory to provision for the cluster in bytes. Must be between 1 GiB * cpu and 8 GiB * cpu. Raises: From 1ede52b12f12b564e579681d115f4c1e9537ec66 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 7 Aug 2025 22:28:31 +0000 Subject: [PATCH 19/19] Fix lint by adding space. --- .../snippets/connect/clusters/create_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 26a4e926ac4..c3045ed84d1 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -51,7 +51,7 @@ def create_connect_cluster( # kafka_cluster_id = "my-kafka-cluster" # primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" # cpu = 12 - # memory_bytes = 12884901888 # 12 GiB + # memory_bytes = 12884901888 # 12 GiB connect_client = ManagedKafkaConnectClient() kafka_client = managedkafka_v1.ManagedKafkaClient()