diff --git a/managedkafka/snippets/connect/clusters/clusters_test.py b/managedkafka/snippets/connect/clusters/clusters_test.py new file mode 100644 index 00000000000..bb3b7295428 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/clusters_test.py @@ -0,0 +1,176 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock +from unittest.mock import MagicMock + +from google.api_core.operation import Operation +from google.cloud import managedkafka_v1 +import pytest + +import create_connect_cluster # noqa: I100 +import delete_connect_cluster +import get_connect_cluster +import list_connect_clusters +import update_connect_cluster + +PROJECT_ID = "test-project-id" +REGION = "us-central1" +KAFKA_CLUSTER_ID = "test-cluster-id" +CONNECT_CLUSTER_ID = "test-connect-cluster-id" + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connect_cluster" +) +def test_create_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + cpu = 12 + memory_bytes = 12884901900 # 12 GB + primary_subnet = "test-subnet" + operation = mock.MagicMock(spec=Operation) + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + operation.result = mock.MagicMock(return_value=connect_cluster) + mock_method.return_value = operation + + create_connect_cluster.create_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + kafka_cluster_id=KAFKA_CLUSTER_ID, + primary_subnet=primary_subnet, + cpu=cpu, + memory_bytes=memory_bytes, + ) + + out, _ = capsys.readouterr() + assert "Created Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.get_connect_cluster" +) +def test_get_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + mock_method.return_value = connect_cluster + + get_connect_cluster.get_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Got Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.update_connect_cluster" +) +def test_update_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + new_memory_bytes = 12884901900 # 12 GB + operation = mock.MagicMock(spec=Operation) + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + connect_cluster.capacity_config.memory_bytes = new_memory_bytes + operation.result = mock.MagicMock(return_value=connect_cluster) + mock_method.return_value = operation + + update_connect_cluster.update_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + memory_bytes=new_memory_bytes, + ) + + out, _ = capsys.readouterr() + assert "Updated Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + assert str(new_memory_bytes) in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.list_connect_clusters" +) +def test_list_connect_clusters( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + + response = [connect_cluster] + mock_method.return_value = response + + list_connect_clusters.list_connect_clusters( + project_id=PROJECT_ID, + region=REGION, + ) + + out, _ = capsys.readouterr() + assert "Got Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.delete_connect_cluster" +) +def test_delete_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + operation = mock.MagicMock(spec=Operation) + mock_method.return_value = operation + + delete_connect_cluster.delete_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Deleted Connect cluster" in out + mock_method.assert_called_once() diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py new file mode 100644 index 00000000000..c3045ed84d1 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -0,0 +1,93 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def create_connect_cluster( + project_id: str, + region: str, + connect_cluster_id: str, + kafka_cluster_id: str, + primary_subnet: str, + cpu: int, + memory_bytes: int, +) -> None: + """ + Create a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + kafka_cluster_id: The ID of the primary Managed Service for Apache Kafka cluster. + primary_subnet: The primary VPC subnet for the Connect cluster workers. The expected format is projects/{project_id}/regions/{region}/subnetworks/{subnet_id}. + cpu: Number of vCPUs to provision for the cluster. The minimum is 12. + memory_bytes: The memory to provision for the cluster in bytes. Must be between 1 GiB * cpu and 8 GiB * cpu. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # [START managedkafka_create_connect_cluster] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient + from google.cloud.managedkafka_v1.types import ConnectCluster, CreateConnectClusterRequest, ConnectNetworkConfig + + # TODO(developer): Update with your values. + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # kafka_cluster_id = "my-kafka-cluster" + # primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" + # cpu = 12 + # memory_bytes = 12884901888 # 12 GiB + + connect_client = ManagedKafkaConnectClient() + kafka_client = managedkafka_v1.ManagedKafkaClient() + + parent = connect_client.common_location_path(project_id, region) + kafka_cluster_path = kafka_client.cluster_path(project_id, region, kafka_cluster_id) + + connect_cluster = ConnectCluster() + connect_cluster.name = connect_client.connect_cluster_path(project_id, region, connect_cluster_id) + connect_cluster.kafka_cluster = kafka_cluster_path + connect_cluster.capacity_config.vcpu_count = cpu + connect_cluster.capacity_config.memory_bytes = memory_bytes + connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)] + # Optionally, you can also specify accessible subnets and resolvable DNS domains as part of your network configuration. + # For example: + # connect_cluster.gcp_config.access_config.network_configs = [ + # ConnectNetworkConfig( + # primary_subnet=primary_subnet, + # additional_subnets=additional_subnets, + # dns_domain_names=dns_domain_names, + # ) + # ] + + request = CreateConnectClusterRequest( + parent=parent, + connect_cluster_id=connect_cluster_id, + connect_cluster=connect_cluster, + ) + + try: + operation = connect_client.create_connect_cluster(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + # Creating a Connect cluster can take 10-40 minutes. + response = operation.result(timeout=3000) + print("Created Connect cluster:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + + # [END managedkafka_create_connect_cluster] diff --git a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py new file mode 100644 index 00000000000..84258fe830f --- /dev/null +++ b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py @@ -0,0 +1,58 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def delete_connect_cluster( + project_id: str, + region: str, + connect_cluster_id: str, +) -> None: + """ + Delete a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_delete_connect_cluster] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + + client = ManagedKafkaConnectClient() + + request = managedkafka_v1.DeleteConnectClusterRequest( + name=client.connect_cluster_path(project_id, region, connect_cluster_id), + ) + + try: + operation = client.delete_connect_cluster(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + operation.result() + print("Deleted Connect cluster") + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + + # [END managedkafka_delete_connect_cluster] diff --git a/managedkafka/snippets/connect/clusters/get_connect_cluster.py b/managedkafka/snippets/connect/clusters/get_connect_cluster.py new file mode 100644 index 00000000000..8dfd39b5958 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/get_connect_cluster.py @@ -0,0 +1,55 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def get_connect_cluster( + project_id: str, + region: str, + connect_cluster_id: str, +) -> None: + """ + Get a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + + Raises: + This method will raise the NotFound exception if the Connect cluster is not found. + """ + # [START managedkafka_get_connect_cluster] + from google.api_core.exceptions import NotFound + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + + client = ManagedKafkaConnectClient() + + cluster_path = client.connect_cluster_path(project_id, region, connect_cluster_id) + request = managedkafka_v1.GetConnectClusterRequest( + name=cluster_path, + ) + + try: + cluster = client.get_connect_cluster(request=request) + print("Got Connect cluster:", cluster) + except NotFound as e: + print(f"Failed to get Connect cluster {connect_cluster_id} with error: {e}") + + # [END managedkafka_get_connect_cluster] diff --git a/managedkafka/snippets/connect/clusters/list_connect_clusters.py b/managedkafka/snippets/connect/clusters/list_connect_clusters.py new file mode 100644 index 00000000000..749a5267d91 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/list_connect_clusters.py @@ -0,0 +1,51 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def list_connect_clusters( + project_id: str, + region: str, +) -> None: + """ + List Kafka Connect clusters in a given project ID and region. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + """ + # [START managedkafka_list_connect_clusters] + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.api_core.exceptions import GoogleAPICallError + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + + connect_client = ManagedKafkaConnectClient() + + request = managedkafka_v1.ListConnectClustersRequest( + parent=connect_client.common_location_path(project_id, region), + ) + + response = connect_client.list_connect_clusters(request=request) + for cluster in response: + try: + print("Got Connect cluster:", cluster) + except GoogleAPICallError as e: + print(f"Failed to list Connect clusters with error: {e}") + + # [END managedkafka_list_connect_clusters] diff --git a/managedkafka/snippets/connect/clusters/requirements.txt b/managedkafka/snippets/connect/clusters/requirements.txt new file mode 100644 index 00000000000..5f372e81c41 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/requirements.txt @@ -0,0 +1,6 @@ +protobuf==5.29.4 +pytest==8.2.2 +google-api-core==2.23.0 +google-auth==2.38.0 +google-cloud-managedkafka==0.1.12 +googleapis-common-protos==1.66.0 diff --git a/managedkafka/snippets/connect/clusters/update_connect_cluster.py b/managedkafka/snippets/connect/clusters/update_connect_cluster.py new file mode 100644 index 00000000000..16587046949 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/update_connect_cluster.py @@ -0,0 +1,72 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def update_connect_cluster( + project_id: str, region: str, connect_cluster_id: str, memory_bytes: int +) -> None: + """ + Update a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + memory_bytes: The memory to provision for the cluster in bytes. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # [START managedkafka_update_connect_cluster] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import ConnectCluster + from google.protobuf import field_mask_pb2 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # memory_bytes = 4295000000 + + connect_client = ManagedKafkaConnectClient() + + connect_cluster = ConnectCluster() + connect_cluster.name = connect_client.connect_cluster_path( + project_id, region, connect_cluster_id + ) + connect_cluster.capacity_config.memory_bytes = memory_bytes + update_mask = field_mask_pb2.FieldMask() + update_mask.paths.append("capacity_config.memory_bytes") + + # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/create-connect-cluster#properties. + request = managedkafka_v1.UpdateConnectClusterRequest( + update_mask=update_mask, + connect_cluster=connect_cluster, + ) + + try: + operation = connect_client.update_connect_cluster(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + operation.result() + response = operation.result() + print("Updated Connect cluster:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + + # [END managedkafka_update_connect_cluster] diff --git a/managedkafka/snippets/requirements.txt b/managedkafka/snippets/requirements.txt index a7da4ff6516..5f372e81c41 100644 --- a/managedkafka/snippets/requirements.txt +++ b/managedkafka/snippets/requirements.txt @@ -2,5 +2,5 @@ protobuf==5.29.4 pytest==8.2.2 google-api-core==2.23.0 google-auth==2.38.0 -google-cloud-managedkafka==0.1.5 +google-cloud-managedkafka==0.1.12 googleapis-common-protos==1.66.0