mentortools/libs/: clickhouse-tools-abm-6.0.63778 metadata and description

Simple index

tools to access data from clickhouse

author Vasya Svintsov
author_email v.svintsov@techokert.ru
classifiers
  • Programming Language :: Python :: 3
  • Programming Language :: Python :: 3.11
description_content_type text/markdown
requires_dist
  • async-tools-abm (>=2.1.57881,<3.0.0)
  • dict-caster-abm (>=1.0.50124,<2.0.0)
  • http-tools-abm (>=4.1.57828,<6.0.0)
  • more-itertools (>=10.1.0,<11.0.0)
  • qpel-tools-abm (>=1.0.53975,<2.0.0)
requires_python >=3.11,<4.0
File Tox results History
clickhouse_tools_abm-6.0.63778-py3-none-any.whl
Size
30 KB
Type
Python Wheel
Python
3
clickhouse_tools_abm-6.0.63778.tar.gz
Size
18 KB
Type
Source

pip installation

pip install --extra-index-url https://pypi2.abm-jsc.ru clickhouse-tools-abm

poetry installation

poetry source add --secondary abm-jsc https://pypi2.abm-jsc.ru
poetry add --source abm-jsc clickhouse-tools-abm@latest

Quick Start

Init ClickHouse http-connector and interface

from aiohttp import ClientSession
from clickhouse_tools import ClickHouseConnector
from clickhouse_tools import ClickHouseInterface


async def init():
    # ----====ClickHouseConnector====----
    clickhouse_connector_config = ClickHouseConnector.Config(
        database="database", login="login", password="password",
        location="http://address:port", timeout_sec=100, max_connections=10)
    clickhouse_connector_context = ClickHouseConnector.Context(session=ClientSession())
    clickhouse_connector = ClickHouseConnector(config=clickhouse_connector_config, context=clickhouse_connector_context)

    # ----====ClickHouseInterface====----
    clickhouse_interface_context = ClickHouseInterface.Context(clickhouse_connector=clickhouse_connector)
    clickhouse_interface = ClickHouseInterface(context=clickhouse_interface_context)

Create and register entity

from clickhouse_tools import PartitionBy, Integer, DateTime
from clickhouse_tools import Column
from clickhouse_tools import convert_to_table
from clickhouse_tools import Table
from clickhouse_tools import ReplacingMergeTree
from clickhouse_tools import Representable
from clickhouse_tools import ClickHouseInterface


@convert_to_table
class RestreamerCameraTimeline(Table, Representable):
    object_id = Column(Integer, primary_key=True, partition_by=PartitionBy.modulo(10))
    timeslot_id = Column(Integer, primary_key=True, partition_by=PartitionBy.divide(100000))
    created_at = Column(DateTime, default_sql="now()")
    state = Column(Integer, default=0)
    error_code = Column(Integer, nullable=True)


RestreamerCameraTimeline.set_table_engine(ReplacingMergeTree(RestreamerCameraTimeline.created_at))


async def register_entity(clickhouse_interface: ClickHouseInterface):
    clickhouse_interface.register_entity(RestreamerCameraTimeline)
    await clickhouse_interface.async_init()

Add

from clickhouse_tools import ClickHouseInterface


async def add(clickhouse_interface: ClickHouseInterface):
    await clickhouse_interface.add(
        "restreamer_camera_timeline",
        values=[{'object_id': 80, 'timeslot_id': 5268090, 'created_at': 1636058970, 'state': 0, 'error_code': 0},
                {'object_id': 80, 'timeslot_id': 5268091, 'error_code': 0}]
    )

Count

from clickhouse_tools import ClickHouseInterface


async def count(clickhouse_interface: ClickHouseInterface):
    restreamer_camera_timeliness_amount = await clickhouse_interface.count(
        "restreamer_camera_timeline",
        filter_by=[{"attribute": "object_id", "operator": "=", "value": 80}]
    )
    print(f"restreamer_camera_timeliness_amount: {restreamer_camera_timeliness_amount}")

Get

from clickhouse_tools import ClickHouseInterface


async def get(clickhouse_interface: ClickHouseInterface):
    restreamer_camera_timeliness = await clickhouse_interface.get(
        "restreamer_camera_timeline",
        filter_by=[{"attribute": "object_id", "operator": "=", "value": 80}],
        order_by=[{"column": "timeslot_id", "ascending": True}],
        limit=10
    )
    print(f"restreamer_camera_timeliness: {restreamer_camera_timeliness}")