mentortools/libs/: kafka-tools-abm-4.0.74169 metadata and description
kafka tools
| author | i.vasiliev |
| author_email | i.vasiliev@technokert.ru |
| classifiers |
|
| description_content_type | text/markdown |
| requires_dist |
|
| requires_python | >=3.11,<4.0 |
| File | Tox results | History |
|---|---|---|
kafka_tools_abm-4.0.74169-py3-none-any.whl
|
|
|
kafka_tools_abm-4.0.74169.tar.gz
|
|
Consumer usage
import asyncio
import logging
import sys
from kafka_tools.deserializer import RecordDeserializer, StringDeserializer, JsonDeserializer
from kafka_tools.kafka_client.consumer import Consumer
from kafka_tools.record_processor.kafka_record import KafkaRecord
from kafka_tools.record_processor.record_processor import RecordProcessor
logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)
async def callback_1(record: KafkaRecord):
logger.info(f"callback_1 record: {record}, type: {type(record.value)}")
await asyncio.sleep(1)
async def callback_2(record: KafkaRecord):
logger.info(f"callback_2 record: {record}, type: {type(record.value)}")
await asyncio.sleep(1)
async def amain():
consumer = Consumer(Consumer.Config("127.0.0.1:9092", group_id="my_group_id", topic_auto_offset_is_latest=False))
deserializer_for_topic_1 = RecordDeserializer(RecordDeserializer.Context(StringDeserializer(), JsonDeserializer()))
# RecordProcessor calling consumer.subscribe in __init__
processor_1 = RecordProcessor(RecordProcessor.Config(topic="my_topic"),
context=RecordProcessor.Context(consumer, deserializer_for_topic_1),
record_callback=callback_1)
deserializer_for_topic_2 = RecordDeserializer(
RecordDeserializer.Context(StringDeserializer(), StringDeserializer()))
processor_2 = RecordProcessor(RecordProcessor.Config(topic="my_another_topic"),
context=RecordProcessor.Context(consumer, deserializer_for_topic_2),
record_callback=callback_2)
await consumer.async_init()
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(amain())
TopicSender usage
import asyncio
import logging
import sys
from kafka_tools.kafka_client.abstract_kafka_client import AbstractKafkaClient
from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_client.topic_sender import TopicSender
from kafka_tools.serializer import RecordSerializer, StringSerializer, JsonSerializer
logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)
async def amain():
kafka_address = "127.0.0.1:9092"
producer: Producer = Producer(AbstractKafkaClient.Config(address=kafka_address))
serializer = RecordSerializer(RecordSerializer.Context(key_serializer=StringSerializer(),
value_serializer=JsonSerializer()))
topic_sender: TopicSender = TopicSender(TopicSender.Config("topic_with_jsons"),
TopicSender.Context(
serializer=serializer,
producer=producer))
await producer.async_init()
await topic_sender.produce(key="test_key", value={'asd': 'val_0'})
# produce method is adding to send queue, so if we end program here, no massage will be published
publish_tasks = [await topic_sender.produce(key="test_key_1", value={'asd': 'val_1'}),
await topic_sender.produce(key="test_key_2", value={'asd': 'val_2'}),
await topic_sender.produce(key="test_key_3", value={'asd': 'val_3'}),
await topic_sender.produce(key="test_key_1", value={'asd': 'val_4'})]
await asyncio.gather(*publish_tasks)
await producer.async_deinit()
if __name__ == '__main__':
asyncio.run(amain())
Customize Deserializer and Serializer
import asyncio
import logging
import sys
import time
from dataclasses import dataclass, asdict
from enum import IntEnum
from typing import Optional
from kafka_tools.deserializer import RecordDeserializer, StringDeserializer, JsonDeserializer
from kafka_tools.kafka_client.consumer import Consumer
from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_client.topic_sender import TopicSender
from kafka_tools.record_processor.kafka_record import KafkaRecord
from kafka_tools.record_processor.record_processor import RecordProcessor
from kafka_tools.serializer import StringSerializer, JsonSerializer, RecordSerializer
logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)
class Status(IntEnum):
not_ok = 0
ok = 1
@dataclass
class SomeData:
id: int
status: Status
created_at: int = time.time()
class SomeDataDeserializer(JsonDeserializer):
def deserialize(self, s: bytes) -> SomeData:
_dict = super().deserialize(s)
return SomeData(_dict["id"], Status(_dict["status"]), _dict["created_at"])
class SomeDataSerializer(JsonSerializer):
def serialize(self, some_dataclass: SomeData) -> bytes:
_dict = asdict(some_dataclass)
return super().serialize(_dict)
class StaringToIntDeserializer(StringDeserializer):
def deserialize(self, s: bytes) -> Optional[int]:
value = super().deserialize(s)
if value is not None:
return int(value)
class IntToStingSerializer(StringSerializer):
def serialize(self, s: str) -> bytes:
s = str(s)
return super().serialize(s)
async def record_callback(record: KafkaRecord):
logger.info(f"record_callback record: {record}, value type: {type(record.value)}")
async def amain():
topic = "topic_with_dataclasses"
kafka_address = "127.0.0.1:9092"
group_id = "my_group_id"
consumer = Consumer(Consumer.Config(address=kafka_address, group_id=group_id, topic_auto_offset_is_latest=False))
deserializer = RecordDeserializer(RecordDeserializer.Context(key_deserializer=StaringToIntDeserializer(),
value_deserializer=SomeDataDeserializer()))
processor = RecordProcessor(RecordProcessor.Config(topic=topic),
context=RecordProcessor.Context(consumer, deserializer),
record_callback=record_callback)
serializer = RecordSerializer(RecordSerializer.Context(key_serializer=IntToStingSerializer(),
value_serializer=SomeDataSerializer()))
producer = Producer(Producer.Config(kafka_address))
topic_sender: TopicSender = TopicSender(TopicSender.Config(topic),
TopicSender.Context(serializer=serializer, producer=producer))
await consumer.async_init()
await producer.async_init()
for i in range(0, 100):
my_class = SomeData(i, Status(0))
await topic_sender.produce(key=my_class.id, value=my_class)
await asyncio.sleep(10)
await producer.async_deinit()
await consumer.async_deinit()
logger.info(f"----===== Deinit done ====----")
BufferRecordProcessor usage
import asyncio
import logging
import sys
from kafka_tools.deserializer import RecordDeserializer, StringDeserializer
from kafka_tools.kafka_client.abstract_kafka_client import AbstractKafkaClient
from kafka_tools.kafka_client.consumer import Consumer
from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_client.topic_sender import TopicSender
from kafka_tools.record_processor.kafka_record import KafkaRecord
from kafka_tools.record_processor.record_processor import BufferRecordProcessor
from kafka_tools.serializer import RecordSerializer, StringSerializer
logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)
max_delay_between_process = 5
max_values_in_buffer = 2
buffer_callback_processing_time = 5
async def buffer_callback(records: list[KafkaRecord]):
logger.info(f"buffer_callback record: {len(records)}, records : {records}")
await asyncio.sleep(buffer_callback_processing_time)
logger.info(f"buffer_callback finished")
async def amain():
topic = "test_buffer"
kafka_address = "127.0.0.1:9092"
group_id = "my_group_id"
consumer = Consumer(Consumer.Config(kafka_address, group_id=group_id, topic_auto_offset_is_latest=False))
deserializer_for_topic_2 = RecordDeserializer(
RecordDeserializer.Context(StringDeserializer(), StringDeserializer()))
buffer_record_processor = BufferRecordProcessor(
config=BufferRecordProcessor.Config(topic=topic,
MAX_VALUES_IN_BUFFER=max_values_in_buffer,
MAX_DELAY_BETWEEN_PROCESS=max_delay_between_process),
context=BufferRecordProcessor.Context(consumer, deserializer_for_topic_2), record_callback=buffer_callback)
await buffer_record_processor.async_init()
await consumer.async_init()
producer: Producer = Producer(AbstractKafkaClient.Config(kafka_address))
serializer = RecordSerializer(
RecordSerializer.Context(key_serializer=StringSerializer(), value_serializer=StringSerializer()))
topic_sender: TopicSender = TopicSender(TopicSender.Config(topic=topic),
TopicSender.Context(serializer=serializer, producer=producer))
await producer.async_init()
for i in range(0, max_values_in_buffer * 2 + 1):
key = "test_key_" + str(i)
value = "value_" + str(i)
await topic_sender.produce(key=key, value=value)
await asyncio.sleep(20)
for i in range(0, max_values_in_buffer):
key = "after_sleep_key_" + str(i)
value = "after_sleep_value_" + str(i)
await topic_sender.produce(key=key, value=value)
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(amain())
kafka dict usage
topic should be preconfigured with:
import asyncio
import logging
import sys
from kafka import KafkaAdminClient
from kafka_tools.deserializer import RecordDeserializer, StringDeserializer, JsonDeserializer
from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_dict.kafka_dict import KafkaTopicDict
from kafka_tools.kafka_dict.kafka_dict_consumer import KafkaDictConsumer
from kafka_tools.serializer import StringSerializer, JsonSerializer, RecordSerializer
logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.DEBUG))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.DEBUG))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)
logging.getLogger('kafka').setLevel(logging.WARNING)
async def on_new_key(key, value):
logger.info(f"got new key: {key} with value: {value}")
async def on_update_key(key, value):
logger.info(f" got update for key: {key} with value: {value}")
def on_delete_key(key):
logger.info(f"on delete key: {key}")
async def amain():
topic = "test_kafka_dict"
kafka_address = "127.0.0.1:9092"
deserializer = RecordDeserializer(RecordDeserializer.Context(key_deserializer=StringDeserializer(),
value_deserializer=JsonDeserializer()))
serializer = RecordSerializer(RecordSerializer.Context(key_serializer=StringSerializer(),
value_serializer=JsonSerializer()))
producer = Producer(Producer.Config(kafka_address))
kafka_admin_client = KafkaAdminClient(bootstrap_servers=kafka_address)
# KafkaDictConsumer create topic with `--config "cleanup.policy=compact"` if not exist
consumer = KafkaDictConsumer(KafkaDictConsumer.Config(kafka_address),
KafkaDictConsumer.Context(kafka_admin_client=kafka_admin_client))
kafka_dict = KafkaTopicDict(KafkaTopicDict.Config(topic=topic),
context=KafkaTopicDict.Context(serializer=serializer,
deserializer=deserializer,
producer=producer,
consumer=consumer))
kafka_dict.subscribe_on_new_key(on_new_key)
kafka_dict.subscribe_on_key_update(on_update_key)
kafka_dict.subscribe_on_key_delete(on_delete_key)
await producer.async_init()
await consumer.async_init()
await kafka_dict.async_init()
logger.info(f"dict after init: {kafka_dict.copy()}")
await kafka_dict.wait_all_msgs()
logger.info(f"dict after wait_all_msgs: {kafka_dict.copy()}")
kafka_dict.set("key_from_topic", {"val": {"created_at": 112356}})
logger.info(f"common get after set: {kafka_dict.get('key_from_topic')}")
logger.info(f"async get after set: {await kafka_dict.aget('key_from_topic')}")
await kafka_dict.set_with_confirm("key_from_topic_1", {"val": {"created_at": 112356}})
await kafka_dict.set_with_confirm("key_from_topic_2", {"val": {"created_at": 112356}})
await kafka_dict.set_with_confirm("key_from_topic_3", {"val": {"created_at": 112356}})
await kafka_dict.set_with_confirm("key_from_topic_5", {"val": {"created_at": 112356}})
await kafka_dict.set_with_confirm("key_from_topic_5", {"val": {"created_at": 112357}})
await kafka_dict.set_with_confirm("key_from_topic_5", {"val": {"created_at": 11235744}})
val = await kafka_dict.pop_with_confirm("key_from_topic_5")
logger.info(f" pop result: {val}")
val = kafka_dict.get("key_from_topic_5")
logger.info(f"get after pop: {val}")
logger.info(kafka_dict.copy())
await consumer.async_deinit()
await producer.async_deinit()
await kafka_dict.async_deinit()
if __name__ == '__main__':
asyncio.run(amain())