mentortools/libs/: kafka-tools-abm-2.0.56296a0 metadata and description
kafka tools
| author | i.vasiliev |
| author_email | i.vasiliev@technokert.ru |
| classifiers |
|
| description_content_type | text/markdown |
| requires_dist |
|
| requires_python | >=3.11,<4.0 |
| File | Tox results | History |
|---|---|---|
kafka_tools_abm-2.0.56296a0-py3-none-any.whl
|
|
|
kafka_tools_abm-2.0.56296a0.tar.gz
|
|
Consumer usage
import asyncio
import logging
import sys
from async_tools import AsyncOnStart
from kafka_tools.utils.deserialize_json import deserialize_json
from kafka_tools.utils.deserialize_string import deserialize_string
from kafka_tools.kafka_client.consumer import Consumer
from kafka_tools.record_processor.kafka_record import KafkaRecord
from kafka_tools.record_processor.record_processor import RecordProcessor
logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)
async def callback_1(record: KafkaRecord):
logger.info(f"callback_1 record: {record}, type: {type(record.value)}")
await asyncio.sleep(1)
async def callback_2(record: KafkaRecord):
logger.info(f"callback_2 record: {record}, type: {type(record.value)}")
await asyncio.sleep(1)
async def main():
consumer = Consumer(Consumer.Config("127.0.0.1:9092", group_id="my_group_id", topic_auto_offset_is_latest=False))
# RecordProcessor calling consumer.subscribe in __init__
processor_1 = RecordProcessor(
RecordProcessor.Config(topic="my_topic"), context=RecordProcessor.Context(consumer),
key_deserializer=deserialize_string, value_deserializer=deserialize_json, record_callback=callback_1
)
processor_2 = RecordProcessor(
RecordProcessor.Config(topic="my_another_topic"), context=RecordProcessor.Context(consumer),
key_deserializer=deserialize_string, value_deserializer=deserialize_string, record_callback=callback_2
)
await AsyncOnStart.start_if_necessary(consumer)
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
TopicSender usage
import asyncio
import logging
import sys
from async_tools import AsyncOnStart, AsyncOnStop
from kafka_tools.kafka_client.abstract_kafka_client import AbstractKafkaClient
from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_client.topic_sender import TopicSender
from kafka_tools.utils.serialize_string import serialize_string
from kafka_tools.utils.serialize_json import serialize_json
logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)
async def main():
kafka_address = "127.0.0.1:9092"
producer: Producer = Producer(AbstractKafkaClient.Config(address=kafka_address))
topic_sender: TopicSender = TopicSender(
config=TopicSender.Config("topic_with_jsons"),
context=TopicSender.Context(producer=producer),
key_serializer=serialize_string, value_serializer=serialize_json
)
await AsyncOnStart.start_if_necessary(producer)
await topic_sender.produce(key="test_key", value={'asd': 'val_0'})
# produce method is adding to send queue, so if we end program here, no message will be published
publish_tasks = [await topic_sender.produce(key="test_key_1", value={'asd': 'val_1'}),
await topic_sender.produce(key="test_key_2", value={'asd': 'val_2'}),
await topic_sender.produce(key="test_key_3", value={'asd': 'val_3'}),
await topic_sender.produce(key="test_key_1", value={'asd': 'val_4'})]
await asyncio.gather(*publish_tasks)
await AsyncOnStop.stop_if_necessary(producer)
if __name__ == '__main__':
asyncio.run(main())
Customize Deserializer and Serializer
import asyncio
import json
import logging
import sys
import time
from dataclasses import dataclass, asdict
from enum import IntEnum
from async_tools import AsyncOnStop, AsyncOnStart
from kafka_tools.kafka_client.consumer import Consumer
from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_client.topic_sender import TopicSender
from kafka_tools.record_processor.kafka_record import KafkaRecord
from kafka_tools.record_processor.record_processor import RecordProcessor
logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)
class Status(IntEnum):
not_ok = 0
ok = 1
@dataclass
class SomeData:
id: int
status: Status
created_at: int = time.time()
def deserialize_some_data(payload: bytes) -> SomeData | None:
value = payload.decode()
if value is not None:
_dict = json.loads(value)
return SomeData(_dict["id"], Status(_dict["status"]), _dict["created_at"])
def serialize_some_data(some_dataclass: SomeData) -> bytes:
_dict = asdict(some_dataclass)
return json.dumps(_dict).encode()
def deserialize_string_to_int(payload: bytes) -> int | None:
value = payload.decode()
if value is not None:
return int(value)
def serialize_int_to_string(int_: int) -> bytes:
return str(int_).encode()
async def record_callback(record: KafkaRecord):
logger.info(f"record_callback record: {record}, value type: {type(record.value)}")
async def main():
topic = "topic_with_dataclasses"
kafka_address = "127.0.0.1:9092"
group_id = "my_group_id"
consumer = Consumer(Consumer.Config(address=kafka_address, group_id=group_id, topic_auto_offset_is_latest=False))
processor = RecordProcessor(
config=RecordProcessor.Config(topic=topic),
context=RecordProcessor.Context(consumer),
key_deserializer=deserialize_string_to_int, value_deserializer=deserialize_some_data,
record_callback=record_callback
)
producer = Producer(Producer.Config(kafka_address))
topic_sender: TopicSender = TopicSender(
config=TopicSender.Config(topic),
context=TopicSender.Context(producer=producer),
key_serializer=serialize_int_to_string, value_serializer=serialize_some_data
)
await AsyncOnStart.start_if_necessary(consumer)
await AsyncOnStart.start_if_necessary(producer)
for i in range(0, 100):
my_class = SomeData(i, Status(0))
await topic_sender.produce(key=my_class.id, value=my_class)
await asyncio.sleep(10)
await AsyncOnStop.stop_if_necessary(producer)
await AsyncOnStop.stop_if_necessary(consumer)
logger.info(f"----===== Deinit done ====----")
if __name__ == '__main__':
asyncio.run(main())
BufferRecordProcessor usage
import asyncio
import logging
import sys
from async_tools import AsyncOnStart
from kafka_tools.kafka_client.consumer import Consumer
from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_client.topic_sender import TopicSender
from kafka_tools.record_processor.kafka_record import KafkaRecord
from kafka_tools.record_processor.record_processor import BufferRecordProcessor
from kafka_tools.kafka_client.abstract_kafka_client import AbstractKafkaClient
from kafka_tools.utils.deserialize_string import deserialize_string
from kafka_tools.utils.serialize_string import serialize_string
logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)
max_delay_between_process = 5
max_values_in_buffer = 2
buffer_callback_processing_time = 5
async def buffer_callback(records: list[KafkaRecord]):
logger.info(f"buffer_callback record: {len(records)}, records : {records}")
await asyncio.sleep(buffer_callback_processing_time)
logger.info(f"buffer_callback finished")
async def main():
topic = "test_buffer"
kafka_address = "127.0.0.1:9092"
group_id = "my_group_id"
consumer = Consumer(Consumer.Config(kafka_address, group_id=group_id, topic_auto_offset_is_latest=False))
buffer_record_processor = BufferRecordProcessor(
config=BufferRecordProcessor.Config(topic=topic,
MAX_VALUES_IN_BUFFER=max_values_in_buffer,
MAX_DELAY_BETWEEN_PROCESS=max_delay_between_process),
context=BufferRecordProcessor.Context(consumer),
key_deserializer=deserialize_string, value_deserializer=deserialize_string, record_callback=buffer_callback
)
await AsyncOnStart.start_if_necessary(buffer_record_processor)
await AsyncOnStart.start_if_necessary(consumer)
producer: Producer = Producer(AbstractKafkaClient.Config(kafka_address))
topic_sender: TopicSender = TopicSender(
config=TopicSender.Config(topic=topic),
context=TopicSender.Context(producer=producer),
key_serializer=serialize_string, value_serializer=serialize_string
)
await AsyncOnStart.start_if_necessary(producer)
for i in range(0, max_values_in_buffer * 2 + 1):
key = "test_key_" + str(i)
value = "value_" + str(i)
await topic_sender.produce(key=key, value=value)
await asyncio.sleep(20)
for i in range(0, max_values_in_buffer):
key = "after_sleep_key_" + str(i)
value = "after_sleep_value_" + str(i)
await topic_sender.produce(key=key, value=value)
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
kafka dict usage
topic should be preconfigured with:
import asyncio
import logging
import sys
from kafka import KafkaAdminClient
from async_tools import AsyncOnStart, AsyncOnStop
from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_dict.kafka_dict import KafkaTopicDict
from kafka_tools.kafka_dict.kafka_dict_consumer import KafkaDictConsumer
from kafka_tools.utils.deserialize_string import deserialize_string
from kafka_tools.utils.deserialize_json import deserialize_json
from kafka_tools.utils.serialize_string import serialize_string
from kafka_tools.utils.serialize_json import serialize_json
logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.DEBUG))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.DEBUG))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)
logging.getLogger('kafka').setLevel(logging.WARNING)
async def on_new_key(key, value):
logger.info(f"got new key: {key} with value: {value}")
async def on_update_key(key, value):
logger.info(f" got update for key: {key} with value: {value}")
def on_delete_key(key):
logger.info(f"on delete key: {key}")
async def main():
topic = "test_kafka_dict"
kafka_address = "127.0.0.1:9092"
producer = Producer(Producer.Config(kafka_address))
kafka_admin_client = KafkaAdminClient(bootstrap_servers=kafka_address)
# KafkaDictConsumer create topic with `--config "cleanup.policy=compact"` if not exist
consumer = KafkaDictConsumer(KafkaDictConsumer.Config(kafka_address),
KafkaDictConsumer.Context(kafka_admin_client=kafka_admin_client))
kafka_dict = KafkaTopicDict(
config=KafkaTopicDict.Config(topic=topic),
context=KafkaTopicDict.Context(producer=producer, consumer=consumer),
key_serializer=serialize_string, value_serializer=serialize_json,
key_deserializer=deserialize_string, value_deserializer=deserialize_json,
)
kafka_dict.subscribe_on_new_key(on_new_key)
kafka_dict.subscribe_on_key_update(on_update_key)
kafka_dict.subscribe_on_key_delete(on_delete_key)
await AsyncOnStart.start_if_necessary(producer)
await AsyncOnStart.start_if_necessary(consumer)
await AsyncOnStart.start_if_necessary(kafka_dict)
logger.info(f"dict after init: {kafka_dict.copy()}")
await kafka_dict.wait_all_msgs()
logger.info(f"dict after wait_all_msgs: {kafka_dict.copy()}")
kafka_dict.set("key_from_topic", {"val": {"created_at": 112356}})
logger.info(f"common get after set: {kafka_dict.get('key_from_topic')}")
logger.info(f"async get after set: {await kafka_dict.aget('key_from_topic')}")
await kafka_dict.set_with_confirm("key_from_topic_1", {"val": {"created_at": 112356}})
await kafka_dict.set_with_confirm("key_from_topic_2", {"val": {"created_at": 112356}})
await kafka_dict.set_with_confirm("key_from_topic_3", {"val": {"created_at": 112356}})
await kafka_dict.set_with_confirm("key_from_topic_5", {"val": {"created_at": 112356}})
await kafka_dict.set_with_confirm("key_from_topic_5", {"val": {"created_at": 112357}})
await kafka_dict.set_with_confirm("key_from_topic_5", {"val": {"created_at": 11235744}})
val = await kafka_dict.pop_with_confirm("key_from_topic_5")
logger.info(f" pop result: {val}")
val = kafka_dict.get("key_from_topic_5")
logger.info(f"get after pop: {val}")
logger.info(kafka_dict.copy())
await AsyncOnStop.stop_if_necessary(consumer)
await AsyncOnStop.stop_if_necessary(producer)
await AsyncOnStop.stop_if_necessary(kafka_dict)
if __name__ == '__main__':
asyncio.run(main())