mentortools/libs/: prometheus-tools-abm-5.1.60091 metadata and description

Simple index Newer version available

tools to collect metrics to prometheus

author Vasya Svintsov
author_email v.svintsov@techokert.ru
classifiers
  • Programming Language :: Python :: 3
  • Programming Language :: Python :: 3.11
description_content_type text/markdown
requires_dist
  • http-tools-abm (>=5.3.59932,<6.0.0)
  • more-itertools (>=10.2.0,<11.0.0)
  • prometheus-client (>=0.17.0,<1)
requires_python >=3.11,<4.0
File Tox results History
prometheus_tools_abm-5.1.60091-py3-none-any.whl
Size
8 KB
Type
Python Wheel
Python
3
prometheus_tools_abm-5.1.60091.tar.gz
Size
6 KB
Type
Source

Quick Start

EXAMPLE 1: Basic usage

import random
from prometheus_client import generate_latest
from prometheus_tools.metrics_registry import MetricsRegistry


metrics = MetricsRegistry(config=MetricsRegistry.Config())
temperature_measurements_buckets = tuple(range(10, 30))
for i in range(10):
    temperature = random.randint(15, 25)
    metrics.gauge('temperature').set(temperature)
    metrics.counter('temperature_measurements_amount').inc()
    metrics.histogram('temperature_measurements', buckets=temperature_measurements_buckets).observe(temperature)

print(generate_latest(metrics._registry))

EXAMPLE 2: Basic usage with labels

import random
from prometheus_client import generate_latest
from prometheus_tools.metrics_registry import MetricsRegistry


metrics = MetricsRegistry(config=MetricsRegistry.Config())
temperature_measurements_buckets = tuple(range(10, 30))
for i in range(10):
    for room_id in range(4):
        temperature = random.randint(15, 25)
        labels = {'room_id': room_id}
        metrics.gauge('temperature', labels).set(temperature)
        metrics.counter('temperature_measurements_amount', labels).inc()
        metrics.histogram('temperature_measurements', temperature_measurements_buckets, labels).observe(temperature)

print(generate_latest(metrics._registry))

EXAMPLE 4: Track without labels

import random
import time
from prometheus_client import generate_latest
from prometheus_tools.metrics_registry import MetricsRegistry


def sleeper() -> None:
    time.sleep(random.random() * 2)


metrics = MetricsRegistry(config=MetricsRegistry.Config())
for i in range(5):
    with metrics.track('sleeper'):
        sleeper()

print(generate_latest(metrics._registry))

EXAMPLE 4: Track with labels

import random
from prometheus_client import generate_latest
from prometheus_tools.metrics_registry import MetricsRegistry


def generate_pdf_report() -> bool:
    return bool(random.randint(0, 1))


def generate_xlsx_report() -> bool:
    return bool(random.randint(0, 1))


metrics = MetricsRegistry(config=MetricsRegistry.Config())
for report_type, handler in (('pdf', generate_pdf_report), ('xlsx', generate_xlsx_report)):
    labels = {'type': report_type}
    except_labels = {'status': None, 'error': None}
    with metrics.track('report_generation', labels, except_labels=except_labels):
        try:
            generation_status = handler()
            labels['status'] = generation_status
        except Exception as er:
            except_labels['error'] = type(er).__name__

print(generate_latest(metrics._registry))

EXAMPLE 5: Initer

from dataclasses import dataclass
import init_helpers
from aiohttp import ClientSession
from async_tools import AsyncInitable, AsyncDeinitable
from http_tools import HttpServer
from prometheus_tools.prometheus_controller import PrometheusController


@dataclass
class Initer:
    @dataclass
    class Config:
        http_server: HttpServer.Config
        metrics: PrometheusController.Config

    config: Config

    @dataclass
    class Context(AsyncInitable, AsyncDeinitable):
        instance_id: str = "Example"
        http_server: HttpServer = None
        metrics: PrometheusController = None
        session: ClientSession = None

        def __post_init__(self) -> None:
            AsyncInitable.__init__(self)
            AsyncDeinitable.__init__(self)

    context: Context

    def __init__(self) -> None:
        self.config = init_helpers.parse_args(config_file=init_helpers.Arg.ini_file_to_dataclass(self.Config))
        self.context = self.Context()

    async def __aenter__(self) -> None:
        self.context.http_server = HttpServer(self.config.http_server, self.context)
        self.context.metrics = PrometheusController(self.config.metrics, self.context)
        self.session = self.context.metrics.get_monitored_client_session()
        await self.context.async_init()

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        if self.context.session is not None:
            await self.context.session.close()
        await self.context.async_deinit()