pip installation
pip install --extra-index-url https://pypi2.abm-jsc.ru clickhouse-tools-abm
poetry installation
poetry source add --secondary abm-jsc https://pypi2.abm-jsc.ru
poetry add --source abm-jsc clickhouse-tools-abm@latest
Quick Start
Init ClickHouse http-connector and interface
from aiohttp import ClientSession
from clickhouse_tools import ClickHouseConnector
from clickhouse_tools import ClickHouseInterface
async def init():
# ----====ClickHouseConnector====----
clickhouse_connector_config = ClickHouseConnector.Config(
database="database", login="login", password="password",
location="http://address:port", timeout_sec=100, max_connections=10)
clickhouse_connector_context = ClickHouseConnector.Context(session=ClientSession())
clickhouse_connector = ClickHouseConnector(config=clickhouse_connector_config, context=clickhouse_connector_context)
# ----====ClickHouseInterface====----
clickhouse_interface_context = ClickHouseInterface.Context(clickhouse_connector=clickhouse_connector)
clickhouse_interface = ClickHouseInterface(context=clickhouse_interface_context)
Create and register entity
from clickhouse_tools import PartitionBy, Integer, DateTime
from clickhouse_tools import Column
from clickhouse_tools import convert_to_table
from clickhouse_tools import Table
from clickhouse_tools import ReplacingMergeTree
from clickhouse_tools import Representable
from clickhouse_tools import ClickHouseInterface
@convert_to_table
class RestreamerCameraTimeline(Table, Representable):
object_id = Column(Integer, primary_key=True, partition_by=PartitionBy.modulo(10))
timeslot_id = Column(Integer, primary_key=True, partition_by=PartitionBy.divide(100000))
created_at = Column(DateTime, default_sql="now()")
state = Column(Integer, default=0)
error_code = Column(Integer, nullable=True)
RestreamerCameraTimeline.set_table_engine(ReplacingMergeTree(RestreamerCameraTimeline.created_at))
async def register_entity(clickhouse_interface: ClickHouseInterface):
clickhouse_interface.register_entity(RestreamerCameraTimeline)
await clickhouse_interface.async_init()
Add
from clickhouse_tools import ClickHouseInterface
async def add(clickhouse_interface: ClickHouseInterface):
await clickhouse_interface.add(
"restreamer_camera_timeline",
values=[{'object_id': 80, 'timeslot_id': 5268090, 'created_at': 1636058970, 'state': 0, 'error_code': 0},
{'object_id': 80, 'timeslot_id': 5268091, 'error_code': 0}]
)
Count
from clickhouse_tools import ClickHouseInterface
async def count(clickhouse_interface: ClickHouseInterface):
restreamer_camera_timeliness_amount = await clickhouse_interface.count(
"restreamer_camera_timeline",
filter_by=[{"attribute": "object_id", "operator": "=", "value": 80}]
)
print(f"restreamer_camera_timeliness_amount: {restreamer_camera_timeliness_amount}")
Get
from clickhouse_tools import ClickHouseInterface
async def get(clickhouse_interface: ClickHouseInterface):
restreamer_camera_timeliness = await clickhouse_interface.get(
"restreamer_camera_timeline",
filter_by=[{"attribute": "object_id", "operator": "=", "value": 80}],
order_by=[{"column": "timeslot_id", "ascending": True}],
limit=10
)
print(f"restreamer_camera_timeliness: {restreamer_camera_timeliness}")