Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c928344
Add Managed Kafka Connect code samples for
salmany Jul 24, 2025
8f2224a
Update google-cloud-managedkafka version to 0.1.12
salmany Jul 24, 2025
1feb354
Update managedkafka/snippets/connect/clusters/create_connect_cluster.py
salmany Jul 24, 2025
5840336
Update managedkafka/snippets/connect/clusters/create_connect_cluster.py
salmany Jul 24, 2025
d0df10f
Update managedkafka/snippets/connect/clusters/delete_connect_cluster.py
salmany Jul 24, 2025
505759a
Update managedkafka/snippets/connect/clusters/get_connect_cluster.py
salmany Jul 24, 2025
d3fba2c
Update managedkafka/snippets/connect/clusters/list_connect_clusters.py
salmany Jul 24, 2025
0a85a8b
Update managedkafka/snippets/connect/clusters/update_connect_cluster.py
salmany Jul 24, 2025
dfa07d2
Add timeouts and improve error handling.
salmany Jul 24, 2025
ab697bf
Addressed PR comments.
salmany Jul 28, 2025
3339325
Adds requirements.txt file for samples.
salmany Jul 29, 2025
f1bb355
Adds code examples for Managed Kafka Connect
salmany Jul 30, 2025
8084f5b
Minor fixes.
salmany Jul 31, 2025
328ab0c
Remove redundant file.
salmany Jul 31, 2025
cf01686
Merge remote-tracking branch 'origin/main' into pr/salmany/13527
salmany Aug 7, 2025
9769a25
Merge branch 'main' into pr/salmany/13527
salmany Sep 7, 2025
0fc9597
Fix merge issues.
salmany Sep 7, 2025
bf5d2c3
Add example for `bootstrap.servers`
salmany Sep 8, 2025
831952c
Remove obsolete MM2 connector example.
salmany Sep 8, 2025
c4b610f
Clarify use for restart_connector.
salmany Sep 8, 2025
b49fa0f
Fix comment on bootstrap.servers for MM2 example.
salmany Sep 8, 2025
2635674
Remove obsolete file.
salmany Sep 8, 2025
692c41f
Fix variable naming and tasks.max value.
salmany Sep 10, 2025
fd5aa27
Fix connectors_test.py linting errors.
salmany Sep 10, 2025
561adc5
Fix linting errors for connector samples.
salmany Sep 10, 2025
15b0e76
linting (black)
glasnt Sep 11, 2025
5a9febe
Remove requirements.txt to fix linting conflicts.
salmany Sep 12, 2025
700f6b3
sort order
glasnt Sep 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ def delete_connect_cluster(
# region = "us-central1"
# connect_cluster_id = "my-connect-cluster"

client = ManagedKafkaConnectClient()
connect_client = ManagedKafkaConnectClient()

request = managedkafka_v1.DeleteConnectClusterRequest(
name=client.connect_cluster_path(project_id, region, connect_cluster_id),
name=connect_client.connect_cluster_path(project_id, region, connect_cluster_id),
)

try:
operation = client.delete_connect_cluster(request=request)
operation = connect_client.delete_connect_cluster(request=request)
print(f"Waiting for operation {operation.operation.name} to complete...")
operation.result()
print("Deleted Connect cluster")
Expand Down
209 changes: 209 additions & 0 deletions managedkafka/snippets/connect/connectors/connectors_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,23 @@
import create_mirrormaker2_source_connector
import create_pubsub_sink_connector
import create_pubsub_source_connector
import delete_connector
import get_connector
from google.api_core.operation import Operation
from google.cloud import managedkafka_v1
import list_connectors
import pause_connector
import pytest
import restart_connector
import resume_connector
import stop_connector
import update_connector


PROJECT_ID = "test-project-id"
REGION = "us-central1"
CONNECT_CLUSTER_ID = "test-connect-cluster-id"
CONNECTOR_ID = "test-connector-id"


@mock.patch(
Expand Down Expand Up @@ -194,3 +203,203 @@ def test_create_bigquery_sink_connector(
assert "Created Connector" in out
assert connector_id in out
mock_method.assert_called_once()


@mock.patch(
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.list_connectors"
)
def test_list_connectors(
mock_method: MagicMock,
capsys: pytest.CaptureFixture[str],
) -> None:
connector = managedkafka_v1.types.Connector()
connector.name = managedkafka_v1.ManagedKafkaConnectClient.connector_path(
PROJECT_ID, REGION, CONNECT_CLUSTER_ID, CONNECTOR_ID
)
mock_method.return_value = [connector]

list_connectors.list_connectors(
project_id=PROJECT_ID,
region=REGION,
connect_cluster_id=CONNECT_CLUSTER_ID,
)

out, _ = capsys.readouterr()
assert "Got connector" in out
assert CONNECTOR_ID in out
mock_method.assert_called_once()


@mock.patch(
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.get_connector"
)
def test_get_connector(
mock_method: MagicMock,
capsys: pytest.CaptureFixture[str],
) -> None:
connector = managedkafka_v1.types.Connector()
connector.name = managedkafka_v1.ManagedKafkaConnectClient.connector_path(
PROJECT_ID, REGION, CONNECT_CLUSTER_ID, CONNECTOR_ID
)
mock_method.return_value = connector

get_connector.get_connector(
project_id=PROJECT_ID,
region=REGION,
connect_cluster_id=CONNECT_CLUSTER_ID,
connector_id=CONNECTOR_ID,
)

out, _ = capsys.readouterr()
assert "Got connector" in out
assert CONNECTOR_ID in out
mock_method.assert_called_once()


@mock.patch(
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.update_connector"
)
def test_update_connector(
mock_method: MagicMock,
capsys: pytest.CaptureFixture[str],
) -> None:
configs = {"tasks.max": "6", "value.converter.schemas.enable": "true"}
operation = mock.MagicMock(spec=Operation)
connector = managedkafka_v1.types.Connector()
connector.name = managedkafka_v1.ManagedKafkaConnectClient.connector_path(
PROJECT_ID, REGION, CONNECT_CLUSTER_ID, CONNECTOR_ID
)
operation.result = mock.MagicMock(return_value=connector)
mock_method.return_value = operation

update_connector.update_connector(
project_id=PROJECT_ID,
region=REGION,
connect_cluster_id=CONNECT_CLUSTER_ID,
connector_id=CONNECTOR_ID,
configs=configs,
)

out, _ = capsys.readouterr()
assert "Updated connector" in out
assert CONNECTOR_ID in out
mock_method.assert_called_once()


@mock.patch(
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.delete_connector"
)
def test_delete_connector(
mock_method: MagicMock,
capsys: pytest.CaptureFixture[str],
) -> None:
operation = mock.MagicMock(spec=Operation)
operation.result = mock.MagicMock(return_value=None)
mock_method.return_value = operation

delete_connector.delete_connector(
project_id=PROJECT_ID,
region=REGION,
connect_cluster_id=CONNECT_CLUSTER_ID,
connector_id=CONNECTOR_ID,
)

out, _ = capsys.readouterr()
assert "Deleted connector" in out
mock_method.assert_called_once()


@mock.patch(
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.pause_connector"
)
def test_pause_connector(
mock_method: MagicMock,
capsys: pytest.CaptureFixture[str],
) -> None:
operation = mock.MagicMock(spec=Operation)
operation.result = mock.MagicMock(return_value=None)
mock_method.return_value = operation

pause_connector.pause_connector(
project_id=PROJECT_ID,
region=REGION,
connect_cluster_id=CONNECT_CLUSTER_ID,
connector_id=CONNECTOR_ID,
)

out, _ = capsys.readouterr()
assert "Paused connector" in out
assert CONNECTOR_ID in out
mock_method.assert_called_once()


@mock.patch(
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.resume_connector"
)
def test_resume_connector(
mock_method: MagicMock,
capsys: pytest.CaptureFixture[str],
) -> None:
operation = mock.MagicMock(spec=Operation)
operation.result = mock.MagicMock(return_value=None)
mock_method.return_value = operation

resume_connector.resume_connector(
project_id=PROJECT_ID,
region=REGION,
connect_cluster_id=CONNECT_CLUSTER_ID,
connector_id=CONNECTOR_ID,
)

out, _ = capsys.readouterr()
assert "Resumed connector" in out
assert CONNECTOR_ID in out
mock_method.assert_called_once()


@mock.patch(
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.stop_connector"
)
def test_stop_connector(
mock_method: MagicMock,
capsys: pytest.CaptureFixture[str],
) -> None:
operation = mock.MagicMock(spec=Operation)
operation.result = mock.MagicMock(return_value=None)
mock_method.return_value = operation

stop_connector.stop_connector(
project_id=PROJECT_ID,
region=REGION,
connect_cluster_id=CONNECT_CLUSTER_ID,
connector_id=CONNECTOR_ID,
)

out, _ = capsys.readouterr()
assert "Stopped connector" in out
assert CONNECTOR_ID in out
mock_method.assert_called_once()


@mock.patch(
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.restart_connector"
)
def test_restart_connector(
mock_method: MagicMock,
capsys: pytest.CaptureFixture[str],
) -> None:
operation = mock.MagicMock(spec=Operation)
operation.result = mock.MagicMock(return_value=None)
mock_method.return_value = operation

restart_connector.restart_connector(
project_id=PROJECT_ID,
region=REGION,
connect_cluster_id=CONNECT_CLUSTER_ID,
connector_id=CONNECTOR_ID,
)

out, _ = capsys.readouterr()
assert "Restarted connector" in out
assert CONNECTOR_ID in out
mock_method.assert_called_once()
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


def create_bigquery_sink_connector(
project_id: str,
region: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


def create_mirrormaker2_source_connector(
project_id: str,
region: str,
Expand Down Expand Up @@ -75,8 +76,9 @@ def create_mirrormaker2_source_connector(
"target.cluster.alias": target_cluster_alias, # This is usually the primary cluster.
# Replicate all topics from the source
"topics": topics,
# The value for bootstrap.servers is a comma-separated list of hostname:port pairs
# for one or more Kafka brokers in the source/target cluster.
# The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
# the source/target cluster.
# For example: "kafka-broker:9092"
"source.cluster.bootstrap.servers": source_bootstrap_servers,
"target.cluster.bootstrap.servers": target_bootstrap_servers,
# You can define an exclusion policy for topics as follows:
Expand Down
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rQ-Qrr responding to your comments on #13522 here:

I think the default converter for CPS sink connector is org.apache.kafka.connect.converters.ByteArrayConverter on UI. Could you check it? Thanks!

You are correct that this is the default on the UI, but I was following the sample config provided here for consistency: https://screenshot.googleplex.com/5gESHNHH6yh25ex

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


def create_pubsub_sink_connector(
project_id: str,
region: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


def create_pubsub_source_connector(
project_id: str,
region: str,
Expand Down
61 changes: 61 additions & 0 deletions managedkafka/snippets/connect/connectors/delete_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def delete_connector(
project_id: str,
region: str,
connect_cluster_id: str,
connector_id: str,
) -> None:
"""
Delete a connector.

Args:
project_id: Google Cloud project ID.
region: Cloud region.
connect_cluster_id: ID of the Kafka Connect cluster.
connector_id: ID of the connector.

Raises:
This method will raise the GoogleAPICallError exception if the operation errors.
"""
# [START managedkafka_delete_connector]
from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
ManagedKafkaConnectClient,
)
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# connect_cluster_id = "my-connect-cluster"
# connector_id = "my-connector"

connect_client = ManagedKafkaConnectClient()

request = managedkafka_v1.DeleteConnectorRequest(
name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id),
)

try:
operation = connect_client.delete_connector(request=request)
print(f"Waiting for operation {operation.operation.name} to complete...")
operation.result()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Deleting a connector can be a long-running operation. It's a good practice to add a timeout to operation.result() to prevent the script from waiting indefinitely.

Suggested change
operation.result()
operation.result(timeout=3000)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you apply this suggestion? Thanks!

print("Deleted connector")
except GoogleAPICallError as e:
print(f"The operation failed with error: {e}")

# [END managedkafka_delete_connector]
Loading