-
Notifications
You must be signed in to change notification settings - Fork 6
138 kafka.read block
#355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
138 kafka.read block
#355
Changes from all commits
06ec064
edbec52
da42af4
831a7e6
7b501e1
69f1235
0e2ee03
1cc3106
814648d
cb784a5
38b59b6
2e71e8d
0dfaaf9
580af85
824fa67
81f107d
3533eb0
9a74193
26d7397
53c81f1
61805b5
ddb1ecb
e5425ed
1e956fe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| import json | ||
| import logging | ||
| from abc import ABCMeta | ||
| from itertools import count | ||
| from typing import AsyncGenerator, List, Optional | ||
|
|
||
| import orjson | ||
| from confluent_kafka import Consumer, KafkaError | ||
| from datayoga_core import utils | ||
| from datayoga_core.context import Context | ||
| from datayoga_core.producer import Message | ||
| from datayoga_core.producer import Producer as DyProducer | ||
|
|
||
| logger = logging.getLogger("dy") | ||
|
|
||
|
|
||
| class Block(DyProducer, metaclass=ABCMeta): | ||
| bootstrap_servers: str | ||
| group: str | ||
| topic: str | ||
| seek_to_beginning: bool | ||
| snapshot: bool | ||
| INTERNAL_FIELD_PREFIX = "__$$" | ||
| MSG_ID_FIELD = f"{INTERNAL_FIELD_PREFIX}msg_id" | ||
| MIN_COMMIT_COUNT = 10 | ||
|
|
||
| def init(self, context: Optional[Context] = None): | ||
| logger.debug(f"Initializing {self.get_block_name()}") | ||
| connection_details = utils.get_connection_details(self.properties["bootstrap_servers"], context) | ||
| logger.debug(f"Connection details: {json.dumps(connection_details)}") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest to move this debug log to the |
||
| self.bootstrap_servers = connection_details.get("bootstrap_servers") | ||
| self.group = self.properties.get("group") | ||
| self.topic = self.properties["topic"] | ||
| self.seek_to_beginning = self.properties.get("seek_to_beginning", False) | ||
| self.snapshot = self.properties.get("snapshot", False) | ||
|
|
||
| async def produce(self) -> AsyncGenerator[List[Message], None]: | ||
| consumer = Consumer(**{ | ||
| 'bootstrap.servers': self.bootstrap_servers, | ||
| 'group.id': self.group, | ||
| 'enable.auto.commit': False, | ||
| 'auto.offset.reset': 'earliest', | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. redundant comma,
Comment on lines
+39
to
+42
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. replace ' to ", normally this is what we use. For consistency sake. |
||
| }) | ||
| logger.debug(f"Producing {self.get_block_name()}") | ||
| # consumer.assign([TopicPartition(self.topic, 0)]) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove unused code. |
||
|
|
||
| if self.seek_to_beginning: | ||
| def on_assign(c, ps): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add types and rename to something more meaningful than |
||
| for p in ps: | ||
| p.offset = -2 | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is -2? Consider using a const for it. |
||
| c.assign(ps) | ||
|
|
||
| consumer.subscribe([self.topic], on_assign) | ||
| logger.debug(f"Seeking to beginning on topic {self.topic}"'') | ||
| else: | ||
| consumer.subscribe([self.topic]) | ||
|
|
||
| try: | ||
| while True: | ||
| # Poll for messages | ||
| msg = consumer.poll(3.0 if self.snapshot else None) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is 3.0? consider using a const instead. |
||
| counter = next(count()) | ||
| if msg is None: | ||
| assert self.snapshot | ||
| logger.warning(f"Snapshot defined quitting on topic {self.topic}") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not a warning actually as it behaves as expected in snapshot mode, change to info, please. |
||
| break | ||
| if msg.error(): | ||
| if msg.error().code() == KafkaError._PARTITION_EOF: | ||
| # End of partition event | ||
| logger.info("Reached end of partition") | ||
| else: | ||
| # Handle other errors | ||
| logger.error("Error: {}".format(msg.error())) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use f-string to be consistent. |
||
|
|
||
| else: | ||
| # Process the message | ||
| message = orjson.loads(msg.value()) | ||
| yield [{self.MSG_ID_FIELD: msg.offset(), **message}] | ||
| # res = consumer.get_watermark_offsets(TopicPartition(self.topic, 0)) | ||
| # logger.error(res) | ||
| # consumer.commit(offsets=res, asynchronous=False) | ||
|
Comment on lines
+79
to
+81
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove unused code. |
||
| if counter % self.MIN_COMMIT_COUNT == 0: | ||
| consumer.commit(asynchronous=False) | ||
|
|
||
| finally: | ||
| try: | ||
| consumer.commit(asynchronous=False) | ||
| except Exception as e: | ||
| logger.error(e) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a message so it would be clear during what it fails, for example: logger.error(f"Error while committing: {e}") |
||
| consumer.close() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can be in another finally for better readability, IMO. |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| { | ||
| "title": "kafka.read", | ||
| "description": "Read from the kafka consumer", | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change to: |
||
| "type": "object", | ||
| "properties": { | ||
| "bootstrap_servers": { | ||
| "description": "host name", | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Host name |
||
| "type": "string" | ||
| }, | ||
| "topic": { | ||
| "description": "Kafka topic", | ||
| "type": "string" | ||
| }, | ||
| "group": { | ||
| "description": "Kafka group", | ||
| "type": "string" | ||
| }, | ||
| "seek_to_beginning": { | ||
| "description": "Consumer seek to beginning", | ||
| "type": "boolean" | ||
| }, | ||
| "snapshot": { | ||
| "type": "boolean", | ||
| "title": "Snapshot current entries and quit", | ||
| "description": "Snapshot current entries and quit", | ||
| "default": false | ||
| } | ||
| }, | ||
| "required": ["bootstrap_servers", "topic"] | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,8 @@ | |
| "mysql", | ||
| "postgresql", | ||
| "redis", | ||
| "sqlserver" | ||
| "sqlserver", | ||
| "kafka" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's keep the list alphabetically sorted. |
||
| ] | ||
| }, | ||
| "if": { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| { | ||
| "title": "Kafka", | ||
| "description": "Schema for configuring Kafka connection parameters", | ||
| "type": "object", | ||
| "properties": { | ||
| "type": { | ||
| "description": "Connection type", | ||
| "type": "string", | ||
| "const": "kafka" | ||
| }, | ||
| "bootstrap_servers": { | ||
| "description": "Kafka Hosts List comma separated", | ||
| "type": "string" | ||
| } | ||
| }, | ||
| "additionalProperties": false, | ||
| "required": ["type", "bootstrap_servers"], | ||
| "examples": [ | ||
| { | ||
| "kafka": { | ||
| "type": "kafka", | ||
| "bootstrap_servers": ["localhost:9092,localhost:9093"] | ||
| } | ||
| } | ||
| ] | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| --- | ||
| parent: Blocks | ||
| grand_parent: Reference | ||
| --- | ||
|
|
||
| # kafka\.read | ||
|
|
||
| Read from the kafka consumer | ||
|
|
||
|
|
||
| **Properties** | ||
|
|
||
| |Name|Type|Description|Required| | ||
| |----|----|-----------|--------| | ||
| |**bootstrap\_servers**|`string`|host name<br/>|yes| | ||
| |**topic**|`string`|Kafka topic<br/>|yes| | ||
| |**group**|`string`|Kafka group<br/>|no| | ||
| |**seek\_to\_beginning**|`boolean`|Consumer seek to beginning<br/>|no| | ||
| |**snapshot**<br/>(Snapshot current entries and quit)|`boolean`|Snapshot current entries and quit<br/>Default: `false`<br/>|no| | ||
|
|
||
| **Example** | ||
|
|
||
| ```yaml | ||
| snapshot: false | ||
|
|
||
| ``` | ||
|
|
||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| --- | ||
| parent: Connection Types | ||
| grand_parent: Reference | ||
| --- | ||
|
|
||
| # Kafka | ||
|
|
||
| Schema for configuring Kafka connection parameters | ||
|
|
||
|
|
||
| **Properties** | ||
|
|
||
| |Name|Type|Description|Required| | ||
| |----|----|-----------|--------| | ||
| |**type**|`string`|Connection type<br/>Constant Value: `"kafka"`<br/>|yes| | ||
| |**bootstrap\_servers**|`string`|Kafka Hosts List comma separated<br/>|yes| | ||
|
|
||
| **Additional Properties:** not allowed | ||
| **Example** | ||
|
|
||
| ```yaml | ||
| kafka: | ||
| type: kafka | ||
| bootstrap_servers: | ||
| - localhost:9092,localhost:9093 | ||
|
|
||
| ``` | ||
|
|
||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| from kafka import KafkaProducer | ||
| from testcontainers.kafka import KafkaContainer | ||
|
|
||
|
|
||
| def get_kafka_container() -> KafkaContainer: | ||
| return ( | ||
| KafkaContainer( | ||
| image="confluentinc/cp-kafka:latest") .with_bind_ports( | ||
| KafkaContainer.KAFKA_PORT, | ||
| KafkaContainer.KAFKA_PORT)) | ||
|
|
||
|
|
||
| def get_kafka_producer(bootstrap_servers: str) -> KafkaProducer: | ||
| return KafkaProducer(bootstrap_servers=bootstrap_servers) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| input: | ||
| uses: kafka.read | ||
| with: | ||
| bootstrap_servers: kafka | ||
| topic: integration-tests | ||
| group: integration-tests | ||
| snapshot: true | ||
| steps: | ||
| - uses: redis.write | ||
| with: | ||
| connection: cache | ||
| command: HSET | ||
| key: | ||
| expression: id | ||
| language: jmespath |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| input: | ||
| uses: kafka.read | ||
| with: | ||
| bootstrap_servers: kafka | ||
| topic: integration-tests | ||
| group: integration-tests | ||
| snapshot: true | ||
| steps: | ||
| - uses: std.write |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| import json | ||
| import logging | ||
|
|
||
| from common import kafka_utils, redis_utils | ||
| from common.utils import run_job | ||
|
|
||
| logger = logging.getLogger("dy") | ||
| message_one = b'{"id":"1","name":"Boris"}' | ||
| message_two = b'{"id":"2","name":"Ivan"}' | ||
|
|
||
|
|
||
| def test_kafka_to_redis(): | ||
| kafka_container = kafka_utils.get_kafka_container() | ||
| try: | ||
| with kafka_container as kafka: | ||
wheelly marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| redis_container = redis_utils.get_redis_oss_container(redis_utils.REDIS_PORT) | ||
| redis_container.start() | ||
|
|
||
| bootstrap_servers = kafka.get_bootstrap_server() | ||
| producer = kafka_utils.get_kafka_producer(bootstrap_servers) | ||
| producer.send("integration-tests", message_one) | ||
| producer.send("integration-tests", message_two) | ||
| producer.flush() | ||
| run_job("tests.kafka_to_redis") | ||
|
|
||
| redis_client = redis_utils.get_redis_client("localhost", redis_utils.REDIS_PORT) | ||
|
|
||
| assert len(redis_client.keys()) == 2 | ||
|
|
||
| boris = redis_client.hgetall("1") | ||
| ivan = redis_client.hgetall("2") | ||
|
|
||
| assert boris == json.loads(message_one.decode()) | ||
| assert ivan == json.loads(message_two.decode()) | ||
| finally: | ||
| redis_container.stop() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| import logging | ||
| import os | ||
|
|
||
| from common import kafka_utils | ||
| from common.utils import run_job | ||
|
|
||
| logger = logging.getLogger("dy") | ||
| message_one = b'{"id":1,"name":"Boris"}' | ||
| message_two = b'{"id":2,"name":"Ivan"}' | ||
| message_three = b'{"id":3,"name":"Yossi"}' | ||
| message_four = b'{"id":4,"name":"Adi"}' | ||
|
|
||
|
|
||
| def test_kafka_to_stdout(tmpdir): | ||
| kafka_container = kafka_utils.get_kafka_container() | ||
| output_file = tmpdir.join("tests_kafka_to_stdout.txt") | ||
| try: | ||
|
|
||
| with kafka_container as kafka: | ||
| bootstrap_servers = kafka.get_bootstrap_server() | ||
| # bootstrap_servers = "host.docker.internal:9093" | ||
| producer = kafka_utils.get_kafka_producer(bootstrap_servers) | ||
| producer.send("integration-tests", message_one) | ||
| producer.send("integration-tests", message_two) | ||
| producer.flush() | ||
|
|
||
| run_job("tests.kafka_to_stdout", None, output_file) | ||
| result = output_file.readlines() | ||
| assert len(result) == 2 | ||
| assert result[0].strip().encode() == message_one | ||
| assert result[1].strip().encode() == message_two | ||
|
|
||
| producer.send("integration-tests", message_three) | ||
| producer.send("integration-tests", message_four) | ||
| producer.flush() | ||
|
|
||
| run_job("tests.kafka_to_stdout", None, output_file) | ||
| result = output_file.readlines() | ||
| assert len(result) == 2 | ||
| assert result[0].strip().encode() == message_three | ||
| assert result[1].strip().encode() == message_four | ||
| finally: | ||
| os.remove(output_file) |
Uh oh!
There was an error while loading. Please reload this page.