mentortools/libs/: kafka-tools-abm-4.0.74169 metadata and description

Simple index

kafka tools

author i.vasiliev
author_email i.vasiliev@technokert.ru
classifiers
  • Programming Language :: Python :: 3
  • Programming Language :: Python :: 3.11
description_content_type text/markdown
requires_dist
  • aiokafka (>=0.8.0,<0.9.0)
  • async-tools-abm (>=2.0.57875,<3.0.0)
  • kafka-python (==2.0.2)
requires_python >=3.11,<4.0
File Tox results History
kafka_tools_abm-4.0.74169-py3-none-any.whl
Size
19 KB
Type
Python Wheel
Python
3
kafka_tools_abm-4.0.74169.tar.gz
Size
14 KB
Type
Source

Consumer usage

import asyncio
import logging
import sys

from kafka_tools.deserializer import RecordDeserializer, StringDeserializer, JsonDeserializer
from kafka_tools.kafka_client.consumer import Consumer
from kafka_tools.record_processor.kafka_record import KafkaRecord
from kafka_tools.record_processor.record_processor import RecordProcessor

logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)


async def callback_1(record: KafkaRecord):
    logger.info(f"callback_1 record: {record}, type: {type(record.value)}")
    await asyncio.sleep(1)


async def callback_2(record: KafkaRecord):
    logger.info(f"callback_2 record: {record}, type: {type(record.value)}")
    await asyncio.sleep(1)


async def amain():
    consumer = Consumer(Consumer.Config("127.0.0.1:9092", group_id="my_group_id", topic_auto_offset_is_latest=False))
    deserializer_for_topic_1 = RecordDeserializer(RecordDeserializer.Context(StringDeserializer(), JsonDeserializer()))
    # RecordProcessor calling consumer.subscribe in __init__
    processor_1 = RecordProcessor(RecordProcessor.Config(topic="my_topic"),
                                  context=RecordProcessor.Context(consumer, deserializer_for_topic_1),
                                  record_callback=callback_1)

    deserializer_for_topic_2 = RecordDeserializer(
        RecordDeserializer.Context(StringDeserializer(), StringDeserializer()))

    processor_2 = RecordProcessor(RecordProcessor.Config(topic="my_another_topic"),
                                  context=RecordProcessor.Context(consumer, deserializer_for_topic_2),
                                  record_callback=callback_2)
    await consumer.async_init()
    while True:
        await asyncio.sleep(1)


if __name__ == '__main__':
    asyncio.run(amain())

TopicSender usage

import asyncio
import logging
import sys

from kafka_tools.kafka_client.abstract_kafka_client import AbstractKafkaClient
from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_client.topic_sender import TopicSender
from kafka_tools.serializer import RecordSerializer, StringSerializer, JsonSerializer

logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)


async def amain():
    kafka_address = "127.0.0.1:9092"
    producer: Producer = Producer(AbstractKafkaClient.Config(address=kafka_address))
    serializer = RecordSerializer(RecordSerializer.Context(key_serializer=StringSerializer(),
                                                           value_serializer=JsonSerializer()))
    topic_sender: TopicSender = TopicSender(TopicSender.Config("topic_with_jsons"),
                                            TopicSender.Context(
                                                serializer=serializer,
                                                producer=producer))

    await producer.async_init()
    await topic_sender.produce(key="test_key", value={'asd': 'val_0'})

    # produce method is adding to send queue, so if we end program here, no massage will be published
    publish_tasks = [await topic_sender.produce(key="test_key_1", value={'asd': 'val_1'}),
                     await topic_sender.produce(key="test_key_2", value={'asd': 'val_2'}),
                     await topic_sender.produce(key="test_key_3", value={'asd': 'val_3'}),
                     await topic_sender.produce(key="test_key_1", value={'asd': 'val_4'})]
    await asyncio.gather(*publish_tasks)
    await producer.async_deinit()


if __name__ == '__main__':
    asyncio.run(amain())

Customize Deserializer and Serializer

import asyncio
import logging
import sys
import time
from dataclasses import dataclass, asdict
from enum import IntEnum
from typing import Optional

from kafka_tools.deserializer import RecordDeserializer, StringDeserializer, JsonDeserializer
from kafka_tools.kafka_client.consumer import Consumer
from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_client.topic_sender import TopicSender
from kafka_tools.record_processor.kafka_record import KafkaRecord
from kafka_tools.record_processor.record_processor import RecordProcessor
from kafka_tools.serializer import StringSerializer, JsonSerializer, RecordSerializer

logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)

ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)


class Status(IntEnum):
    not_ok = 0
    ok = 1


@dataclass
class SomeData:
    id: int
    status: Status
    created_at: int = time.time()


class SomeDataDeserializer(JsonDeserializer):
    def deserialize(self, s: bytes) -> SomeData:
        _dict = super().deserialize(s)
        return SomeData(_dict["id"], Status(_dict["status"]), _dict["created_at"])


class SomeDataSerializer(JsonSerializer):
    def serialize(self, some_dataclass: SomeData) -> bytes:
        _dict = asdict(some_dataclass)
        return super().serialize(_dict)


class StaringToIntDeserializer(StringDeserializer):
    def deserialize(self, s: bytes) -> Optional[int]:
        value = super().deserialize(s)
        if value is not None:
            return int(value)


class IntToStingSerializer(StringSerializer):
    def serialize(self, s: str) -> bytes:
        s = str(s)
        return super().serialize(s)


async def record_callback(record: KafkaRecord):
    logger.info(f"record_callback record: {record}, value type: {type(record.value)}")


async def amain():
    topic = "topic_with_dataclasses"
    kafka_address = "127.0.0.1:9092"
    group_id = "my_group_id"
    consumer = Consumer(Consumer.Config(address=kafka_address, group_id=group_id, topic_auto_offset_is_latest=False))

    deserializer = RecordDeserializer(RecordDeserializer.Context(key_deserializer=StaringToIntDeserializer(),
                                                                 value_deserializer=SomeDataDeserializer()))

    processor = RecordProcessor(RecordProcessor.Config(topic=topic),
                                context=RecordProcessor.Context(consumer, deserializer),
                                record_callback=record_callback)

    serializer = RecordSerializer(RecordSerializer.Context(key_serializer=IntToStingSerializer(),
                                                           value_serializer=SomeDataSerializer()))

    producer = Producer(Producer.Config(kafka_address))

    topic_sender: TopicSender = TopicSender(TopicSender.Config(topic),
                                            TopicSender.Context(serializer=serializer, producer=producer))

    await consumer.async_init()
    await producer.async_init()

    for i in range(0, 100):
        my_class = SomeData(i, Status(0))
        await topic_sender.produce(key=my_class.id, value=my_class)

    await asyncio.sleep(10)
    await producer.async_deinit()
    await consumer.async_deinit()
    logger.info(f"----===== Deinit done ====----")

BufferRecordProcessor usage

import asyncio
import logging
import sys

from kafka_tools.deserializer import RecordDeserializer, StringDeserializer
from kafka_tools.kafka_client.abstract_kafka_client import AbstractKafkaClient
from kafka_tools.kafka_client.consumer import Consumer
from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_client.topic_sender import TopicSender
from kafka_tools.record_processor.kafka_record import KafkaRecord
from kafka_tools.record_processor.record_processor import BufferRecordProcessor
from kafka_tools.serializer import RecordSerializer, StringSerializer

logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)

max_delay_between_process = 5
max_values_in_buffer = 2
buffer_callback_processing_time = 5


async def buffer_callback(records: list[KafkaRecord]):
    logger.info(f"buffer_callback record: {len(records)}, records : {records}")
    await asyncio.sleep(buffer_callback_processing_time)
    logger.info(f"buffer_callback finished")


async def amain():
    topic = "test_buffer"
    kafka_address = "127.0.0.1:9092"
    group_id = "my_group_id"
    consumer = Consumer(Consumer.Config(kafka_address, group_id=group_id, topic_auto_offset_is_latest=False))
    deserializer_for_topic_2 = RecordDeserializer(
        RecordDeserializer.Context(StringDeserializer(), StringDeserializer()))

    buffer_record_processor = BufferRecordProcessor(
        config=BufferRecordProcessor.Config(topic=topic,
                                            MAX_VALUES_IN_BUFFER=max_values_in_buffer,
                                            MAX_DELAY_BETWEEN_PROCESS=max_delay_between_process),
        context=BufferRecordProcessor.Context(consumer, deserializer_for_topic_2), record_callback=buffer_callback)
    await buffer_record_processor.async_init()
    await consumer.async_init()

    producer: Producer = Producer(AbstractKafkaClient.Config(kafka_address))
    serializer = RecordSerializer(
        RecordSerializer.Context(key_serializer=StringSerializer(), value_serializer=StringSerializer()))
    topic_sender: TopicSender = TopicSender(TopicSender.Config(topic=topic),
                                            TopicSender.Context(serializer=serializer, producer=producer))

    await producer.async_init()
    for i in range(0, max_values_in_buffer * 2 + 1):
        key = "test_key_" + str(i)
        value = "value_" + str(i)
        await topic_sender.produce(key=key, value=value)

    await asyncio.sleep(20)

    for i in range(0, max_values_in_buffer):
        key = "after_sleep_key_" + str(i)
        value = "after_sleep_value_" + str(i)
        await topic_sender.produce(key=key, value=value)

    while True:
        await asyncio.sleep(1)


if __name__ == '__main__':
    asyncio.run(amain())

kafka dict usage

topic should be preconfigured with:

import asyncio
import logging
import sys

from kafka import KafkaAdminClient

from kafka_tools.deserializer import RecordDeserializer, StringDeserializer, JsonDeserializer
from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_dict.kafka_dict import KafkaTopicDict
from kafka_tools.kafka_dict.kafka_dict_consumer import KafkaDictConsumer
from kafka_tools.serializer import StringSerializer, JsonSerializer, RecordSerializer

logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.DEBUG))
ch = logging.StreamHandler(sys.stdout)

ch.setLevel(logging.getLevelName(logging.DEBUG))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)
logging.getLogger('kafka').setLevel(logging.WARNING)


async def on_new_key(key, value):
    logger.info(f"got new key: {key} with value: {value}")


async def on_update_key(key, value):
    logger.info(f" got update for key: {key} with value: {value}")


def on_delete_key(key):
    logger.info(f"on delete key: {key}")


async def amain():
    topic = "test_kafka_dict"
    kafka_address = "127.0.0.1:9092"
    deserializer = RecordDeserializer(RecordDeserializer.Context(key_deserializer=StringDeserializer(),
                                                                 value_deserializer=JsonDeserializer()))
    serializer = RecordSerializer(RecordSerializer.Context(key_serializer=StringSerializer(),
                                                           value_serializer=JsonSerializer()))
    producer = Producer(Producer.Config(kafka_address))

    kafka_admin_client = KafkaAdminClient(bootstrap_servers=kafka_address)

    # KafkaDictConsumer create topic with `--config "cleanup.policy=compact"` if not exist

    consumer = KafkaDictConsumer(KafkaDictConsumer.Config(kafka_address),
                                 KafkaDictConsumer.Context(kafka_admin_client=kafka_admin_client))

    kafka_dict = KafkaTopicDict(KafkaTopicDict.Config(topic=topic),
                                context=KafkaTopicDict.Context(serializer=serializer,
                                                               deserializer=deserializer,
                                                               producer=producer,
                                                               consumer=consumer))
    kafka_dict.subscribe_on_new_key(on_new_key)
    kafka_dict.subscribe_on_key_update(on_update_key)
    kafka_dict.subscribe_on_key_delete(on_delete_key)
    await producer.async_init()
    await consumer.async_init()
    await kafka_dict.async_init()
    logger.info(f"dict after init: {kafka_dict.copy()}")
    await kafka_dict.wait_all_msgs()
    logger.info(f"dict after wait_all_msgs: {kafka_dict.copy()}")
    kafka_dict.set("key_from_topic", {"val": {"created_at": 112356}})
    logger.info(f"common get after set: {kafka_dict.get('key_from_topic')}")
    logger.info(f"async get after set: {await kafka_dict.aget('key_from_topic')}")

    await kafka_dict.set_with_confirm("key_from_topic_1", {"val": {"created_at": 112356}})
    await kafka_dict.set_with_confirm("key_from_topic_2", {"val": {"created_at": 112356}})
    await kafka_dict.set_with_confirm("key_from_topic_3", {"val": {"created_at": 112356}})
    await kafka_dict.set_with_confirm("key_from_topic_5", {"val": {"created_at": 112356}})
    await kafka_dict.set_with_confirm("key_from_topic_5", {"val": {"created_at": 112357}})
    await kafka_dict.set_with_confirm("key_from_topic_5", {"val": {"created_at": 11235744}})
    val = await kafka_dict.pop_with_confirm("key_from_topic_5")
    logger.info(f" pop result: {val}")

    val = kafka_dict.get("key_from_topic_5")
    logger.info(f"get after pop: {val}")

    logger.info(kafka_dict.copy())
    await consumer.async_deinit()
    await producer.async_deinit()
    await kafka_dict.async_deinit()


if __name__ == '__main__':
    asyncio.run(amain())