mentortools/libs/: kafka-tools-abm-2.0.56359a0 metadata and description

Simple index Stable version available

kafka tools

author i.vasiliev
author_email i.vasiliev@technokert.ru
classifiers
  • Programming Language :: Python :: 3
  • Programming Language :: Python :: 3.11
description_content_type text/markdown
requires_dist
  • aiokafka (>=0.8.0,<0.9.0)
  • async-tools-abm (>=1.0.53967,<2.0.0)
requires_python >=3.11,<4.0
File Tox results History
kafka_tools_abm-2.0.56359a0-py3-none-any.whl
Size
18 KB
Type
Python Wheel
Python
3
kafka_tools_abm-2.0.56359a0.tar.gz
Size
13 KB
Type
Source

Consumer usage

import asyncio
import logging
import sys

from async_tools import AsyncOnStart
from kafka_tools.utils.deserialize_json import deserialize_json
from kafka_tools.utils.deserialize_string import deserialize_string
from kafka_tools.kafka_client.consumer import Consumer
from kafka_tools.record_processor.kafka_record import KafkaRecord
from kafka_tools.record_processor.record_processor import RecordProcessor

logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)


async def callback_1(record: KafkaRecord):
    logger.info(f"callback_1 record: {record}, type: {type(record.value)}")
    await asyncio.sleep(1)


async def callback_2(record: KafkaRecord):
    logger.info(f"callback_2 record: {record}, type: {type(record.value)}")
    await asyncio.sleep(1)


async def main():
    consumer = Consumer(Consumer.Config("127.0.0.1:9092", group_id="my_group_id", topic_auto_offset_is_latest=False))
    # RecordProcessor calling consumer.subscribe in __init__
    processor_1 = RecordProcessor(
        RecordProcessor.Config(topic="my_topic"), context=RecordProcessor.Context(consumer),
        key_deserializer=deserialize_string, value_deserializer=deserialize_json, record_callback=callback_1
    )
    processor_2 = RecordProcessor(
        RecordProcessor.Config(topic="my_another_topic"), context=RecordProcessor.Context(consumer),
        key_deserializer=deserialize_string, value_deserializer=deserialize_string, record_callback=callback_2
    )
    
    await AsyncOnStart.start_if_necessary(consumer)
    while True:
        await asyncio.sleep(1)


if __name__ == '__main__':
    asyncio.run(main())

TopicSender usage

import asyncio
import logging
import sys

from async_tools import AsyncOnStart, AsyncOnStop
from kafka_tools.kafka_client.abstract_kafka_client import AbstractKafkaClient

from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_client.topic_sender import TopicSender
from kafka_tools.utils.serialize_string import serialize_string
from kafka_tools.utils.serialize_json import serialize_json


logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)


async def main():
    kafka_address = "127.0.0.1:9092"
    producer: Producer = Producer(AbstractKafkaClient.Config(address=kafka_address))
    topic_sender: TopicSender = TopicSender(
        config=TopicSender.Config("topic_with_jsons"),
        context=TopicSender.Context(producer=producer),
        key_serializer=serialize_string, value_serializer=serialize_json
    )

    await AsyncOnStart.start_if_necessary(producer)
    await topic_sender.produce(key="test_key", value={'asd': 'val_0'})

    # produce method is adding to send queue, so if we end program here, no message will be published
    publish_tasks = [await topic_sender.produce(key="test_key_1", value={'asd': 'val_1'}),
                     await topic_sender.produce(key="test_key_2", value={'asd': 'val_2'}),
                     await topic_sender.produce(key="test_key_3", value={'asd': 'val_3'}),
                     await topic_sender.produce(key="test_key_1", value={'asd': 'val_4'})]
    await asyncio.gather(*publish_tasks)
    await AsyncOnStop.stop_if_necessary(producer)


if __name__ == '__main__':
    asyncio.run(main())

Customize Deserializer and Serializer

import asyncio
import json
import logging
import sys
import time
from dataclasses import dataclass, asdict
from enum import IntEnum

from async_tools import AsyncOnStop, AsyncOnStart
from kafka_tools.kafka_client.consumer import Consumer
from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_client.topic_sender import TopicSender
from kafka_tools.record_processor.kafka_record import KafkaRecord
from kafka_tools.record_processor.record_processor import RecordProcessor

logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)

ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)


class Status(IntEnum):
    not_ok = 0
    ok = 1


@dataclass
class SomeData:
    id: int
    status: Status
    created_at: int = time.time()


def deserialize_some_data(payload: bytes) -> SomeData | None:
    value = payload.decode()
    if value is not None:
        _dict = json.loads(value)
        return SomeData(_dict["id"], Status(_dict["status"]), _dict["created_at"])


def serialize_some_data(some_dataclass: SomeData) -> bytes:
    _dict = asdict(some_dataclass)
    return json.dumps(_dict).encode()


def deserialize_string_to_int(payload: bytes) -> int | None:
    value = payload.decode()
    if value is not None:
        return int(value)


def serialize_int_to_string(int_: int) -> bytes:
    return str(int_).encode()


async def record_callback(record: KafkaRecord):
    logger.info(f"record_callback record: {record}, value type: {type(record.value)}")


async def main():
    topic = "topic_with_dataclasses"
    kafka_address = "127.0.0.1:9092"
    group_id = "my_group_id"
    consumer = Consumer(Consumer.Config(address=kafka_address, group_id=group_id, topic_auto_offset_is_latest=False))

    processor = RecordProcessor(
        config=RecordProcessor.Config(topic=topic),
        context=RecordProcessor.Context(consumer),
        key_deserializer=deserialize_string_to_int, value_deserializer=deserialize_some_data,
        record_callback=record_callback
    )

    producer = Producer(Producer.Config(kafka_address))

    topic_sender: TopicSender = TopicSender(
        config=TopicSender.Config(topic),
        context=TopicSender.Context(producer=producer),
        key_serializer=serialize_int_to_string, value_serializer=serialize_some_data
    )

    await AsyncOnStart.start_if_necessary(consumer)
    await AsyncOnStart.start_if_necessary(producer)

    for i in range(0, 100):
        my_class = SomeData(i, Status(0))
        await topic_sender.produce(key=my_class.id, value=my_class)

    await asyncio.sleep(10)
    await AsyncOnStop.stop_if_necessary(producer)
    await AsyncOnStop.stop_if_necessary(consumer)
    logger.info(f"----===== Deinit done ====----")

if __name__ == '__main__':
    asyncio.run(main())

BufferRecordProcessor usage

import asyncio
import logging
import sys

from async_tools import AsyncOnStart

from kafka_tools.kafka_client.consumer import Consumer
from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_client.topic_sender import TopicSender
from kafka_tools.record_processor.kafka_record import KafkaRecord
from kafka_tools.record_processor.record_processor import BufferRecordProcessor
from kafka_tools.kafka_client.abstract_kafka_client import AbstractKafkaClient
from kafka_tools.utils.deserialize_string import deserialize_string
from kafka_tools.utils.serialize_string import serialize_string


logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.INFO))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.getLevelName(logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)

max_delay_between_process = 5
max_values_in_buffer = 2
buffer_callback_processing_time = 5


async def buffer_callback(records: list[KafkaRecord]):
    logger.info(f"buffer_callback record: {len(records)}, records : {records}")
    await asyncio.sleep(buffer_callback_processing_time)
    logger.info(f"buffer_callback finished")


async def main():
    topic = "test_buffer"
    kafka_address = "127.0.0.1:9092"
    group_id = "my_group_id"
    consumer = Consumer(Consumer.Config(kafka_address, group_id=group_id, topic_auto_offset_is_latest=False))

    buffer_record_processor = BufferRecordProcessor(
        config=BufferRecordProcessor.Config(topic=topic,
                                            MAX_VALUES_IN_BUFFER=max_values_in_buffer,
                                            MAX_DELAY_BETWEEN_PROCESS=max_delay_between_process),
        context=BufferRecordProcessor.Context(consumer), 
        key_deserializer=deserialize_string, value_deserializer=deserialize_string, record_callback=buffer_callback
    )
    await AsyncOnStart.start_if_necessary(buffer_record_processor)
    await AsyncOnStart.start_if_necessary(consumer)

    producer: Producer = Producer(AbstractKafkaClient.Config(kafka_address))
    topic_sender: TopicSender = TopicSender(
        config=TopicSender.Config(topic=topic),
        context=TopicSender.Context(producer=producer),
        key_serializer=serialize_string, value_serializer=serialize_string
    )

    await AsyncOnStart.start_if_necessary(producer)
    for i in range(0, max_values_in_buffer * 2 + 1):
        key = "test_key_" + str(i)
        value = "value_" + str(i)
        await topic_sender.produce(key=key, value=value)

    await asyncio.sleep(20)

    for i in range(0, max_values_in_buffer):
        key = "after_sleep_key_" + str(i)
        value = "after_sleep_value_" + str(i)
        await topic_sender.produce(key=key, value=value)

    while True:
        await asyncio.sleep(1)


if __name__ == '__main__':
    asyncio.run(main())

kafka dict usage

topic should be preconfigured with:

import asyncio
import logging
import sys

from kafka import KafkaAdminClient

from async_tools import AsyncOnStart, AsyncOnStop

from kafka_tools.kafka_client.producer import Producer
from kafka_tools.kafka_dict.kafka_dict import KafkaTopicDict
from kafka_tools.kafka_dict.kafka_dict_consumer import KafkaDictConsumer
from kafka_tools.utils.deserialize_string import deserialize_string
from kafka_tools.utils.deserialize_json import deserialize_json
from kafka_tools.utils.serialize_string import serialize_string
from kafka_tools.utils.serialize_json import serialize_json

logger = logging.getLogger(__file__)
root = logging.getLogger()
root.setLevel(logging.getLevelName(logging.DEBUG))
ch = logging.StreamHandler(sys.stdout)

ch.setLevel(logging.getLevelName(logging.DEBUG))
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger('aiokafka').setLevel(logging.WARNING)
logging.getLogger('kafka').setLevel(logging.WARNING)


async def on_new_key(key, value):
    logger.info(f"got new key: {key} with value: {value}")


async def on_update_key(key, value):
    logger.info(f" got update for key: {key} with value: {value}")


def on_delete_key(key):
    logger.info(f"on delete key: {key}")


async def main():
    topic = "test_kafka_dict"
    kafka_address = "127.0.0.1:9092"
    producer = Producer(Producer.Config(kafka_address))

    kafka_admin_client = KafkaAdminClient(bootstrap_servers=kafka_address)

    # KafkaDictConsumer create topic with `--config "cleanup.policy=compact"` if not exist

    consumer = KafkaDictConsumer(KafkaDictConsumer.Config(kafka_address),
                                 KafkaDictConsumer.Context(kafka_admin_client=kafka_admin_client))

    kafka_dict = KafkaTopicDict(
        config=KafkaTopicDict.Config(topic=topic), 
        context=KafkaTopicDict.Context(producer=producer, consumer=consumer),
        key_serializer=serialize_string, value_serializer=serialize_json,
        key_deserializer=deserialize_string, value_deserializer=deserialize_json,
    )
    kafka_dict.subscribe_on_new_key(on_new_key)
    kafka_dict.subscribe_on_key_update(on_update_key)
    kafka_dict.subscribe_on_key_delete(on_delete_key)
    await AsyncOnStart.start_if_necessary(producer)
    await AsyncOnStart.start_if_necessary(consumer)
    await AsyncOnStart.start_if_necessary(kafka_dict)
    logger.info(f"dict after init: {kafka_dict.copy()}")
    await kafka_dict.wait_all_msgs()
    logger.info(f"dict after wait_all_msgs: {kafka_dict.copy()}")
    kafka_dict.set("key_from_topic", {"val": {"created_at": 112356}})
    logger.info(f"common get after set: {kafka_dict.get('key_from_topic')}")
    logger.info(f"async get after set: {await kafka_dict.aget('key_from_topic')}")

    await kafka_dict.set_with_confirm("key_from_topic_1", {"val": {"created_at": 112356}})
    await kafka_dict.set_with_confirm("key_from_topic_2", {"val": {"created_at": 112356}})
    await kafka_dict.set_with_confirm("key_from_topic_3", {"val": {"created_at": 112356}})
    await kafka_dict.set_with_confirm("key_from_topic_5", {"val": {"created_at": 112356}})
    await kafka_dict.set_with_confirm("key_from_topic_5", {"val": {"created_at": 112357}})
    await kafka_dict.set_with_confirm("key_from_topic_5", {"val": {"created_at": 11235744}})
    val = await kafka_dict.pop_with_confirm("key_from_topic_5")
    logger.info(f" pop result: {val}")

    val = kafka_dict.get("key_from_topic_5")
    logger.info(f"get after pop: {val}")

    logger.info(kafka_dict.copy())
    await AsyncOnStop.stop_if_necessary(consumer)
    await AsyncOnStop.stop_if_necessary(producer)
    await AsyncOnStop.stop_if_necessary(kafka_dict)


if __name__ == '__main__':
    asyncio.run(main())