mentortools/libs/: openapi-tools-abm-7.1.76202a0 metadata and description

Simple index Stable version available

author Mike Orlov
author_email m.orlov@technokert.ru
classifiers
  • Programming Language :: Python :: 3
  • Programming Language :: Python :: 3.11
description_content_type text/markdown
requires_dist
  • aiohttp (>=3.11.13,<4.0.0)
  • apispec (>=6.8.0,<7.0.0)
  • async-tools-abm (>=2.1.61556,<3.0.0)
  • authlib (>=1.3.2,<2.0.0)
  • cryptography (>=46.0.2,<47.0.0)
  • dict-caster-abm (>=1.0.49813,<2.0.0)
  • dynamic-types-abm (>=1.2.71654,<2.0.0)
  • frozendict (>=2.4.6,<3.0.0)
  • furl (>=2.1.3,<3.0.0)
  • http-tools-abm (>=6.2.72577,<7.0.0)
  • init-helpers-abm (>=3.1.71726,<4.0.0)
  • more-itertools (>=10.8.0,<11.0.0)
  • pyyaml (>=6.0.2,<7.0.0)
requires_python >=3.11,<4.0
File Tox results History
openapi_tools_abm-7.1.76202a0-py3-none-any.whl
Size
1 MB
Type
Python Wheel
Python
3
openapi_tools_abm-7.1.76202a0.tar.gz
Size
1 MB
Type
Source

OpenAPI tools

Библиотека позволяет без изменений в логике публиковать её методы в OpenAPI спецификации и HTTP сервере

Копирование кода для тестов
cp -r $ORIGIN_DIR/openapi_tools ./openapi_tools

Примеры использования

Минимальный пример: эндпоинт без аргументов

Допустим, мы хотим опубликовать эндпоинт по получению температуры CPU (захардкодим)

import asyncio
from http_tools import HttpServer, JsonableAnswer, HttpStatusCode
from openapi_tools import OpenApiServer, Endpoint


# функция, которую мы хотим опубликовать
def get_temp():  # сигнатура функции может быть не типизирована 
  return 42


async def main():
  # бойлерплейт
  http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
  api_server = OpenApiServer(OpenApiServer.Config('1'),
                             OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
  # Регистрация эндпоинта
  api_server.register(Endpoint(get_temp, "GET", "/temperature", [], {}, JsonableAnswer[HttpStatusCode.OK][int]))
  # запуск сервера
  async with http_server:
    await asyncio.sleep(10 ** 10)


asyncio.run(main())

Получим варнинг, что в эндпоинте нет проверки доступа:
(Здесь и далее ... - любая подстрока, *** - любое количество любых строк)

...: UserWarning: Endpoints without securities will be prohibited
  warnings.warn('Endpoints without securities will be prohibited')

Можно проверить эндпоинт с помощью запроса из консоли:

curl --location "http://0.0.0.0:8888/temperature"

В ответ получим 200 ОК с телом

42

Так же можно получить спецификацию API:

curl --location "http://0.0.0.0:8888/.docs/openapi.json"

В ответ получим 200 ОК со спецификацией:

{
  "openapi": "3.1.0",
  "info": {"title": "ABM:openapi:readme", "version": "1"},
  "paths": {
    "/temperature": {
      "get": {
        "operationId": "get_temp",
        "responses": {
          "200": {
            "description": "",
            "content": {
              "application/json": {"schema": {"type": "integer"}}
            }
          },
          "default": {
            "description": "",
            "content": {
              "application/json": {"schema": {
                "$ref": "#/components/schemas/ErrorDescription"
              }}
            }
          }
        }
      }
    }
  }, 
  "components": {
    "schemas": {
      "ErrorDescription": {
        "type": "object",
        "properties": {
          "error": {"type": "string"},
          "error_type": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null},
          "error_code": {"anyOf": [{"type": "integer"}, {"type": "null"}], "default": null},
          "done": {"type": "boolean", "default": false}
        },
        "required": ["error"],
        "additionalProperties": false
      }
    }
  }
}
{
  "openapi": "3.1.0",
  "info": {"title": "ABM:openapi:readme", "version": "1"},
  "paths": {
    "/temperature": {
      "get": {
        "operationId": "get_temp",
        "responses": {
          "200": {
            "description": "",
            "content": {
              "application/json": {"schema": {"type": "integer"}}
            }
          },
          "default": {
            "description": "",
            "content": {
              "application/json": {"schema": {"$ref": "#/components/schemas/schema_ErrorDescription"}}
            }
          }
        }
      }
    }
  },
  "components": {
    "schemas": {
      "schema_ErrorDescription": {
        "type": "object",
        "properties": {
          "error": {"type": "string"},
          "error_type": {"anyOf": [{"type": "string"}, {"type": "null"}]},
          "error_code": {"anyOf": [{"type": "integer"}, {"type": "null"}]},
          "done": {"type": "boolean"}
        }
      }
    }
  }
}

Рассмотрим подробнее код регистрации:
api_server.register(RpcEndpoint(get_now, "GET", "/now", [], {}, JsonableAnswer[HttpStatusCode.OK][int]))

В данном примере наш ответ имеет статус код 200 OK, тип содержимого application/json и структура - один int

Пример с параметрами

В реальности у эндпоинтов почти всегда есть входные параметры.
Допустим, мы хотим опубликовать метод по возведению числа в степень

import math
import asyncio
from http_tools import HttpServer, WrappedAnswer, HttpStatusCode
from openapi_tools import OpenApiServer, Endpoint, PathParameter, SecurityRequirement


def power(base: float, exponent: float) -> float:
  return pow(base, exponent)


async def main():
  http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
  api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
  # initialise empty security requirement to allow anyone to access this endpoint
  no_security = SecurityRequirement()
  api_server.register(Endpoint(
    power, "GET", "/power/{value}/{exp}", [no_security], {
      # ожидаем параметр 'arg' типа int в HTTP пути запроса и передаём его в аргумент 'value' обработчика  
      'base': PathParameter(name='value', schema=float),
      'exponent': PathParameter(name='exp', schema=float),
    },
    # Мы используем подход, что в теле json ответов всегда dict, содержащий метаинформацию об ответе, 
    # например, поле 'done', а данные лежат под ключом result. 
    # Всю эту логику реализует WrappedAnswer, дальше будет использовать его       
    WrappedAnswer[HttpStatusCode.OK][float]
  ))
  async with http_server:
    await asyncio.sleep(10 ** 10)


asyncio.run(main())

Можно проверить эндпоинт с помощью запроса из консоли:

curl --location "http://0.0.0.0:8888/power/7/0.5"

Получим следующий ответ:

{"done": true, "result": 2.6457513110645907}

Если пошлём некорректный тип:

curl --location "http://0.0.0.0:8888/power/bad/value"

то библиотека попробует скастить его в требуемый тип и мы получим следующий ответ:

{"error": "could not convert string to float: 'bad'", "error_type": "ValueError", "error_code": null, "done": false}

Однако, если мы сделаем запрос с отрицательным числом

curl --location "http://0.0.0.0:8888/power/-1/0.5"

то получим код 500 и довольно невнятную ошибку:

{
  "done": false,
  "error_code": null, 
  "error_type": "FieldErrors", 
  "error": "{'result': TypeError(\"float() argument must be a string or a real number, not 'complex'\")}"
}

Это связано с тем, что pow возвращает значение типа complex для отрицательных оснований при нецелых степенях, а у нас указан возращаемый тип float. Будем возвращать ошибку для таких значений.

Пример обработки исключений

Доработаем обработку ошибок, для этого:

import math
import asyncio

from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, WrappedAnswer
from openapi_tools import OpenApiServer, Endpoint, PathParameter, SecurityRequirement


def power(base: float, exponent: float) -> float:
  assert base >= 0 or int(exponent) == exponent, f'Negative bases with non integer exponents are prohibited'
  return pow(base, exponent)


async def main():
  http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
  api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
  no_security = SecurityRequirement()
  api_server.register(Endpoint(
    power, "GET", "/power/{base}/{exponent}", [no_security], {
      # Можно использовать одинаковые названия для аргументов вызова функции и параметров эндпоинта 
      'base': PathParameter(name='base', schema=float),
      'exponent': PathParameter(name='exponent', schema=float),
    },
    WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
      AssertionError: ExceptionAnswer[HttpStatusCode.BadRequest]
    }))
  async with http_server:
    await asyncio.sleep(10 ** 10)


asyncio.run(main())

Проверим ответ на отрицательное число с дробной степенью:

curl --location "http://0.0.0.0:8888/power/-1/0.5"

Получим код 400 и более понятную ошибку:

{
  "done": false,
  "error_code": null,
  "error_type": "AssertionError",
  "error": "Negative bases with non integer exponents are prohibited"  
}

Если же требуется автоматизировать обработку различных ошибок на клиенте, то в ответе стоит использовать http_tools.ErrorCode

Пример использования кодов ошибок

Для демонстрации допустим, что sqrt не может вычислять корень слишком больших значений
Создадим отдельные исключения и для каждого из них укажем ErrorCode

import math
import asyncio

from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, ErrorCode, WrappedAnswer
from openapi_tools import OpenApiServer, Endpoint, PathParameter, SecurityRequirement


class NegativeExponentWithZeroBase(ValueError):
  pass


class NegativeValue(ValueError):
  pass


def power(base: float, exponent: float) -> float:
  if base == 0 and exponent < 0:
    raise NegativeExponentWithZeroBase(f'Zero base with negative exponents are prohibited, got: {exponent}')
  if base < 0 and int(exponent) != exponent:
    raise NegativeValue(f'With fractional exponent({exponent}) only non negative bases allowed, got: {base}')
  return pow(base, exponent)


async def main():
  http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
  api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
  no_security = SecurityRequirement()
  api_server.register(Endpoint(
    power, "GET", "/power/{base}/{exponent}", [no_security], {
      'base': PathParameter(name='base', schema=float),
      'exponent': PathParameter(name='exponent', schema=float),
    },
    WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
      NegativeExponentWithZeroBase: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
      NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
    }))
  async with http_server:
    await asyncio.sleep(10 ** 10)


asyncio.run(main())

Проверим ответ на большое число:

curl --location "http://0.0.0.0:8888/power/0/-0.5"

Получим код 400 и ошибку с кодом:

{"error": "Zero base with negative exponents are prohibited, got: -0.5", "error_type": "NegativeExponentWithZeroBase", "error_code": 1, "done": false}

Проверим ответ на отрицательное число с дробноой степенью:

curl --location "http://0.0.0.0:8888/power/-1/0.5"

Получим код 400 и ошибку с кодом:

{"error": "With fractional exponent(0.5) only non negative bases allowed, got: -1.0", "error_type": "NegativeValue", "error_code": 2, "done": false}

Однако, данный подход(коды ошибок указываются напрямую) может приводить к путанице при увеличении размеров API - в разных эндпоинтах один и тот же код может обозначать разное.

Пример наделения одного кода ошибки разными смыслами

Проиллюстрируем проблему, добавив ещё одну функцию бизнес-логики по возведению числа в степень, и опубликуем её в апи

import math
import asyncio

from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, ErrorCode, WrappedAnswer
from openapi_tools import OpenApiServer, Endpoint, PathParameter, SecurityRequirement


class NegativeExponentWithZeroBase(ValueError):
  pass


class NegativeValue(ValueError):
  pass


def power(base: float, exponent: float) -> float:
  if base == 0 and exponent < 0:
    raise NegativeExponentWithZeroBase('Zero base with negative exponents are prohibited')
  if base < 0 and int(exponent) != exponent:
    raise NegativeValue(f'With fractional exponent({exponent}) only non negative bases allowed, got: {base}')
  return pow(base, exponent)


def sqrt(value: int) -> float:
  if value < 0:
    raise NegativeValue(f'Only non negative values allowed, got: {value}')
  return math.sqrt(value)


async def main():
  http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
  api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
  no_security = SecurityRequirement()
  api_server.register(Endpoint(
    sqrt, "GET", "/sqrt/{arg}", [no_security], {'value': PathParameter(name='value', schema=int)},
    WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
      NegativeExponentWithZeroBase: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
      NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
    }))
  api_server.register(Endpoint(
    power, "GET", "/pow/{base}/{exponent}", [no_security], {
      'base': PathParameter(name='base', schema=float),
      'exponent': PathParameter(name='exponent', schema=float),
    },
    WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
      NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
    }))
  async with http_server:
    await asyncio.sleep(10 ** 10)


asyncio.run(main())

Проверим эндпоинт /math/pow:

curl "http://0.0.0.0:8888/pow/-1/-2.1"

Получим код 400 и ошибку с кодом 1:

{"error": "With fractional exponent(-2.1) only non negative bases allowed, got: -1.0", "error_type": "NegativeValue", "error_code": 1, "done": false}

При этом получается, что одна и та же проблема в разных эндпоинтах имеет разные коды, что может путать разработчиков и усложнять код клиентского приложения.
Есть два подхода для составления API, более единообразного в плане кодов ошибок:

  1. Именованный реестр кодов ошибок
  2. Единый словарь обработки исключений

Пример использования именованного реестра кодов

Создадим enum со своим перечнем кодов ошибок, тогда при использовании это будут не просто числа, а именованные параметры

import enum
import math
import asyncio

import http_tools
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, WrappedAnswer
from openapi_tools import OpenApiServer, Endpoint, PathParameter, SecurityRequirement


class NegativeExponentWithZeroBase(ValueError):
  pass


class NegativeValue(ValueError):
  pass


def sqrt(value: int) -> float:
  if value < 0:
    raise NegativeValue(f'Only non negative values allowed, got: {value}')
  return math.sqrt(value)


@enum.unique
class ErrorCode(http_tools.answer.ErrorCode, enum.Enum):
  negative_value = 1
  negative_exponent = 2


def power(base: float, exponent: float) -> float:
  if base == 0 and exponent < 0:
    raise NegativeExponentWithZeroBase('Zero base with negative exponents are prohibited')
  if base < 0 and int(exponent) != exponent:
    raise NegativeValue(f'With fractional exponent({exponent}) only non negative bases allowed, got: {base}')
  return pow(base, exponent)


async def main():
  http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
  api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
  no_security = SecurityRequirement()
  api_server.register(Endpoint(
    sqrt, "GET", "/sqrt/{arg}", [no_security], {'value': PathParameter(name='arg', schema=int)},
    WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
      NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode.negative_value],
    }))
  api_server.register(Endpoint(
    power, "GET", "/pow/{base}/{exponent}", [no_security], {
      'base': PathParameter(name='base', schema=float),
      'exponent': PathParameter(name='exponent', schema=float),
    },
    WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
      NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode.negative_value],
      NegativeExponentWithZeroBase: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode.negative_exponent],
    }))
  async with http_server:
    await asyncio.sleep(10 ** 10)


asyncio.run(main())

Такой подход требует минимальных изменений и увеличивает читаемость, но и с ним возможны ошибки - одно и то же исключение в разных эндпоинтах может отображаться на разные коды

Пример единого словаря обработки исключений

Мы можем собрать единый словарь со всеми типами исключений и выбирать из него необходимые для каждого эндпоинта
ВНИМАНИЕ, передавать этот словарь целиком при каждой регистрации обработчика НЕЛЬЗЯ
Это приведёт к формированию спецификации с огромным количеством лишних ответов

import math
import typing
import asyncio

from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, ErrorCode, WrappedAnswer
from openapi_tools import OpenApiServer, Endpoint, PathParameter, SecurityRequirement


class NegativeExponentWithZeroBase(ValueError):
  pass


class NegativeValue(ValueError):
  pass


def sqrt(value: int) -> float:
  if value < 0:
    raise NegativeValue(f'Only non negative values allowed, got: {value}')
  return math.sqrt(value)


def power(base: float, exponent: float) -> float:
  if base == 0 and exponent < 0:
    raise NegativeExponentWithZeroBase('Zero base with negative exponents are prohibited')
  if base < 0 and int(exponent) != exponent:
    raise NegativeValue(f'With fractional exponent({exponent}) only non negative bases allowed, got: {base}')
  return pow(base, exponent)


class PickableDict(dict):
  def pick(self, keys: typing.Iterable) -> dict:
    """Возвращает новый словарь только с выбранными ключами."""
    return {k: self[k] for k in keys if k in self}


async def main():
  http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
  api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
  no_security = SecurityRequirement()
  exception_to_answer = PickableDict({
    NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
    NegativeExponentWithZeroBase: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
  })
  api_server.register(Endpoint(
    sqrt, "GET", "/sqrt/{arg}", [no_security], {'value': PathParameter(name='arg', schema=int)},
    WrappedAnswer[HttpStatusCode.OK][float], exception_to_answer.pick({NegativeValue})))
  api_server.register(Endpoint(
    power, "GET", "/pow/{base}/{exponent}", [no_security], {
      'base': PathParameter(name='base', schema=float),
      'exponent': PathParameter(name='exponent', schema=float),
    },
    WrappedAnswer[HttpStatusCode.OK][float], exception_to_answer.pick({NegativeValue, NegativeExponentWithZeroBase})))
  async with http_server:
    await asyncio.sleep(10 ** 10)


asyncio.run(main())

Этот способ на данный момент является рекомендуемым при использовании кодов ошибок в ответах

Пример предобработки аргументов обработчика

В прошлых примерах можно было увидеть, что параметры запроса прямо отображаются в аргументы обработчика, но иногда требуется предобработка.
Рассмотрим пример, где мы публикуем функцию, считающую количество знаков в строке
Но перед подсчётом мы хотим удалить пробелы в начале и в конце строки
Принимать строку будет из query части запроса
НЕРАБОЧИЙ пример, пояснение ниже

import asyncio

from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import WrappedAnswer
from openapi_tools import OpenApiServer, Endpoint, QueryParameter, CallTemplate, SecurityRequirement


def get_len(value: str) -> int:
  return len(value)


async def main():
  http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
  api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
  no_security = SecurityRequirement()
  api_server.register(Endpoint(
    get_len, "GET", "/len", [no_security], {'value': QueryParameter(name='value', schema=str)},
    WrappedAnswer[HttpStatusCode.OK][int]
  ))
  api_server.register(Endpoint(
    get_len, "GET", "/len/strip", [no_security], 
    {'value': CallTemplate(str.strip, QueryParameter(name='value', schema=str))},
    WrappedAnswer[HttpStatusCode.OK][int]
  ))
  async with http_server:
    await asyncio.sleep(10 ** 10)


asyncio.run(main())

Это НЕРАБОЧИЙ пример: каждый эндпоинт имеет "имя"(operationId) и это имя должно быть уникальным на сервере.
По дефолту в качестве имени используется __name__ обработчика.
Поэтому, если один обработчик требуется зарегистрировать более одного раза, то требуется явно указывать его operation_id.
Перепишем пример:

import asyncio

from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import WrappedAnswer
from openapi_tools import OpenApiServer, Endpoint, QueryParameter, CallTemplate, SecurityRequirement


def get_len(value: str) -> int:
  return len(value)


async def main():
  http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
  api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
  no_security = SecurityRequirement()
  api_server.register(Endpoint(
    get_len, "GET", "/len", [no_security], {'value': QueryParameter(name='value', schema=str)},
    WrappedAnswer[HttpStatusCode.OK][int]
  ))
  api_server.register(Endpoint(
    get_len, "GET", "/len/strip", [no_security], 
    {'value': CallTemplate(str.strip, QueryParameter(name='value', schema=str))},
    WrappedAnswer[HttpStatusCode.OK][int], operation_id='get_len__stripped'
  ))
  async with http_server:
    await asyncio.sleep(10 ** 10)


asyncio.run(main())

Проверим эндпоинт /len (пробел в урле обозначается символом '+'):

curl "http://0.0.0.0:8888/len?value=+test"

Получим {"done": true, "result": 5}
Проверим эндпоинт /len/strip:

curl "http://0.0.0.0:8888/len/strip?value=+test"

Получим {"done": true, "result": 4}

Пример более сложной предобработки аргументов обработчика

Рассмотрим пример, когда нам нужно опубликовать эндпоинт по расчёту расстояния между двумя точками (датаклассами)
Датаклассы можно создать из json строки, переданной в query части запроса, но это преобразование нужно описывать явно
Или можем принимать их из json тела. Реализуем оба варианта

import json
import dataclasses
import asyncio

from init_helpers.dict_to_dataclass import dict_to_dataclass
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import WrappedAnswer
from openapi_tools import OpenApiServer, Endpoint, QueryParameter, CallTemplate, JsonBodyParameter, SecurityRequirement


@dataclasses.dataclass
class Point:
  x: int
  y: int


def get_dist(first: Point, second: Point) -> float:
  return ((first.x - second.x) ** 2 + (first.y - second.y) ** 2) ** 0.5


def parse_json_as_point(value: str) -> Point:
  return dict_to_dataclass(json.loads(value), Point)


async def main():
  http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
  api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
  no_security = SecurityRequirement()
  api_server.register(Endpoint(
    get_dist, "GET", "/dist", [no_security], {
      'first': CallTemplate(parse_json_as_point, QueryParameter(name='first', schema=str)),
      'second': CallTemplate(parse_json_as_point, QueryParameter(name='second', schema=str)),
    }, WrappedAnswer[HttpStatusCode.OK][float],
  ))
  api_server.register(Endpoint(
    get_dist, "GET", "/dist/body", [no_security], {
      'first': JsonBodyParameter(name='first', schema=Point),
      'second': JsonBodyParameter(name='second', schema=Point),
    }, WrappedAnswer[HttpStatusCode.OK][float], operation_id='get_dist_body'
  ))
  async with http_server:
    await asyncio.sleep(10 ** 10)


asyncio.run(main())

Проверим эндпоинт /dist:

curl -X GET --location "http://0.0.0.0:8888/dist?first=%7B%22x%22%3A1%2C%22y%22%3A2%7D&second=%7B%22x%22%3A3%2C%22y%22%3A4%7D"

Проверим эндпоинт /dist/body:

curl -X GET --location "http://0.0.0.0:8888/dist/body" \
    -H "Content-Type: application/json" \
    -d '{"first":{"x":1,"y":2},"second":{"x":3,"y":4}}'

Ответ одинаковый для обоих запросов: {"done": true, "result": 2.8284271247461903}

Пример автогенерации параметров запроса и типа ответа

В прошлых примерах можно заметить, что если не используется предобработка, то параметры, указываемые при регистрации, повторяют имена и типы аргументов обработчика, а ответ использует возвращаемый тип в качестве схемы
Для сокращения кода есть функции, генерирующие объект RpcEndpoint на основе сигнатуры обработчика
Используем эндпоинты без предобработки из прошлых примеров (/len для расчёта длины строки и /dist для расстояния между точками)

import dataclasses
import asyncio

from http_tools import HttpServer
from openapi_tools import OpenApiServer, gen_json_endpoint, gen_query_endpoint, SecurityRequirement

@dataclasses.dataclass
class Point:
    x: int
    y: int

def get_dist(first: Point, second: Point) -> float:
    return ((first.x - second.x) ** 2 + (first.y - second.y) ** 2) ** 0.5

def get_len(value: str) -> int:
    return len(value)

async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    no_security = SecurityRequirement()
    api_server.register(gen_json_endpoint(get_dist, "GET", "/dist", [no_security]))
    api_server.register(gen_query_endpoint(get_len, "GET", "/len", [no_security]))
    async with http_server:
        await asyncio.sleep(10 ** 10)

asyncio.run(main())

Проверим эндпоинт /dist:

curl -X GET --location "http://0.0.0.0:8888/dist" \
    -H "Content-Type: application/json" \
    -d '{"first":{"x":1,"y":2},"second":{"x":3,"y":4}}'

Проверим эндпоинт /len:

curl "http://0.0.0.0:8888/len?value=test"

Аутентификация и авторизация

Базовая аутентификация

В прошлых примерах можно увидеть при регистрации эндпоинта [SecurityRequirement()].
Это список объектов, описывающих проверку прав доступа.
Чтобы ограничить доступ к какому-то эндпоинту достаточно при регистрации указать SecurityScheme или SecurityRequirement.
SecurityScheme описывает, как проверяется секрет (токен/ключ) и что является результатом проверки, а SecurityRequirement описывает, какие SecurityScheme используются совместно и применяет требования/извлекает значения из результатов работы SecurityScheme.

Рассмотрим простейший пример - аутентификацию по фиксированному секрету

import math
import asyncio
from init_helpers import raise_if
from http_tools import HttpServer
from openapi_tools import OpenApiServer, ApiKeySecurityScheme, gen_query_endpoint, ParameterLocation, Unauthorized 

def sqrt(value: int) -> float:
    return math.sqrt(value)
    
async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    # Опишем resolver (способ проверки токена). Он должен возвращать информацию о пользователе и список его прав
    # Для простоты захардкодим токен, будем возвращать None в качестве информации о пользователе и пустой набор прав
    def resolver(token: str) -> tuple[None,  list[str]]:
        raise_if(token != 'secret', Unauthorized(f'{token=} is invalid'))
        return None, []
    
    # Создадим схему безопасности, требующую передачу токена в query части запроса под именем 'name', применим resolver  
    api_key_scheme = ApiKeySecurityScheme[None](location=ParameterLocation.query, name='token', resolver=resolver)
    # Регистрируем эндпоинт с этой схемой
    api_server.register(gen_query_endpoint(sqrt, "get", "/sqrt", [api_key_scheme],))
    async with http_server:
        await asyncio.sleep(10 ** 10)

asyncio.run(main())

Делаем запрос без токена

curl "http://0.0.0.0:8888/sqrt?value=4"

Получаем в ответ 401 Unauthorized с телом. В теле описано, какие варианты авторизации были проверены(api_key_in_query_token[]) и почему не подошли(Unauthorized()):

{
  "error": {
    "api_key_in_query_token[]": "Unauthorized()"
  }, 
  "error_type": "ErrorGroup", 
  "error_code": null, 
  "done": false
}

Сделаем запрос с неверным токеном

curl "http://0.0.0.0:8888/sqrt?value=4&token=qqq"

Получаем в ответ 401 Unauthorized. В теле видим исключение из resolver'а(Unauthorized("token='qqq' is invalid")):

{
  "error": {
    "api_key_in_query_token[]": "Unauthorized(\"token='qqq' is invalid\")"
  },
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Сделаем запрос с верным токеном

curl "http://0.0.0.0:8888/sqrt?value=4&token=secret"

Получим 200 OK с телом {"done": true, "result": 2.0}

Контроль доступа по скоупам (RBAC)

Часто аутентифицированные пользователи имеют разные набор полномочий и могут вызывать разный набор эндпоинтов.
Для этого в спецификации OpenAPI предусмотрена возможность каждому эндпоинту указать необходимый набор скоупов(прав).
Рассмотрим пример простой очереди задач:

import asyncio
from http_tools import HttpServer
from init_helpers import raise_if
from openapi_tools import OpenApiServer, gen_json_endpoint
from openapi_tools import HttpBearerSecurityScheme, Unauthorized

tasks_query = []

def add_task(target: int) -> list[int]:
    tasks_query.append(target)
    return tasks_query

def pop_task() -> int | None:
    if tasks_query:
        return tasks_query.pop(0)

def reset_task() -> list[int]:
    if value := pop_task():
        add_task(value)
    return tasks_query

async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))

    # Опишем resolver. Он должен возвращать информацию о пользователе и список его прав.
    # Будем возвращать None в качестве информации о пользователе, а набор прав брать из словаря
    def resolver(token: str) -> tuple[None, list[str]]:
        scopes = {'user_token': ['add'], 'worker_token': ['pop'], 'admin_token': ['add', 'pop']}.get(token)
        raise_if(scopes is None, Unauthorized(f'{token=} is invalid'))
        return None, scopes

    # Создадим схему безопасности, на основе HTTP bearer авторизации(передачи токена в хедере Authorisation)  
    http_auth = HttpBearerSecurityScheme[None](resolver=resolver)
    # Регистрируем эндпоинты с этой схемой, указывая требуемые скоупы
    api_server.register(gen_json_endpoint(add_task, "POST", "/task/add", [http_auth.has('add')]))
    api_server.register(gen_json_endpoint(pop_task, "POST", "/task/pop", [http_auth.has('pop')]))
    api_server.register(gen_json_endpoint(reset_task, "post", "/task/reset", [http_auth.has('add', 'pop')]))
    async with http_server:
        await asyncio.sleep(10 ** 10)


asyncio.run(main())

Добавляем задачу от пользователя

curl -X POST --location "http://0.0.0.0:8888/task/add" \
    -H "Authorization: Bearer user_token" -H "Content-Type: application/json" \
    -d '{"target": 42}'

Получим 200 OK: {"done": true, "result": [42]}
Попробуем от имени этого же пользователя извлечь задачу

curl -X POST --location "http://0.0.0.0:8888/task/pop" \
    -H "Authorization: Bearer user_token" 

Получим 403 с указанием, что проверен один вариант авторизации(http_bearer требующий права pop). Он сообщает, что авторизация провалена(Forbidden)(т.е. аутентификация пройдена), т.к. не хватило прав(missing scopes), а именно: pop:

{
  "error": {
    "http_bearer[pop]": "Forbidden('missing scopes: pop')"
  },
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Запросим задачу от имени воркера:

curl -X POST --location "http://0.0.0.0:8888/task/pop" \
    -H "Authorization: Bearer worker_token" 

Получим 200 OK: {"done": true, "result": 42}
Добавим три задачи от админа

curl -X POST --location "http://0.0.0.0:8888/task/add" \
    -H "Authorization: Bearer admin_token" -H "Content-Type: application/json" \
    -d '{"target": 1}'
curl -X POST --location "http://0.0.0.0:8888/task/add" \
    -H "Authorization: Bearer admin_token" -H "Content-Type: application/json" \
    -d '{"target": 2}'
curl -X POST --location "http://0.0.0.0:8888/task/add" \
    -H "Authorization: Bearer admin_token" -H "Content-Type: application/json" \
    -d '{"target": 3}'

На последний запрос получим 200 OK: {"done": true, "result": [1, 2, 3]}
А теперь от имени админа сбросим первую задачу в конец списка

curl -X POST --location "http://0.0.0.0:8888/task/reset" \
    -H "Authorization: Bearer admin_token"

Получим 200 OK: {"done": true, "result": [2, 3, 1]} Попробуем сделать это от имени пользователя:

curl -X POST --location "http://0.0.0.0:8888/task/reset" \
    -H "Authorization: Bearer user_token"

Получим 403 с указанием, что проверен один вариант авторизации(http_bearer требующий прав add и pop). Он сообщает, что авторизация провалена(Forbidden)(т.е. аутентификация пройдена), т.к. не хватило прав(missing scopes), а именно: pop:

{
  "error": {
    "http_bearer[add,pop]": "Forbidden('missing scopes: pop')"
  },
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Попробуем сделать это от имени воркера:

curl -X POST --location "http://0.0.0.0:8888/task/reset" \
    -H "Authorization: Bearer worker_token"

Получим 403 с указанием, что проверен один вариант авторизации(http_bearer требующий прав add и pop). Он сообщает, что авторизация провалена(Forbidden)(т.е. аутентификация пройдена), т.к. не хватило прав(missing scopes), а именно: add:

{
  "error": {
    "http_bearer[add,pop]": "Forbidden('missing scopes: add')"
  },
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Контроль доступа по атрибутам пользователей и объектов (ABAC)

Часто в проектах требуется ограничение не только на уровне доступа до эндпоинтов, но и на уровне ограничения действий или объектов, обрабатываемых эндпоинтами.
Рассмотрим пример по мотивам системы доступа к камерам БР:

import asyncio
import datetime
import enum
import itertools
import operator
import time
from dataclasses import dataclass
from typing import Iterable

from http_tools import HttpServer, HttpStatusCode, StreamAnswer, FileAnswer
from http_tools.answer import ExceptionAnswer, ErrorCode
from http_tools.mime_types import ContentType
from init_helpers import raise_if, PickableDict

from openapi_tools import OpenApiServer, HttpBearerSecurityScheme, gen_query_endpoint, Unauthorized

# опишем объект доступа
@dataclass
class Camera:
    id: int
    region_id: int

cameras = [Camera(10, 1), Camera(20, 2), Camera(21, 2), Camera(30, 3), Camera(31, 3), Camera(32, 3)]

# опишем исключения
@dataclass
class CameraNotFound(Exception):
    camera_id: int

@dataclass
class MissingArchive(Exception):
    at: float

# Вспомогательная функция, т.к. всем эндпоинтам нужна камера по id с ограничением по доступным пользователю регионам
def _get_camera(camera_id: int, only_region_ids: Iterable[int] | None) -> Camera | None:
    only_cameras = cameras
    if only_region_ids is not None:
        only_region_ids = frozenset(only_region_ids)
        only_cameras = [c for c in cameras if c.region_id in only_region_ids]
    return next(iter(filter(lambda cam: cam.id == camera_id, only_cameras)), None)

async def watch_life_video(camera_id: int, only_region_ids: Iterable[int] | None) -> bytes:
    raise_if((camera := _get_camera(camera_id, only_region_ids)) is None, CameraNotFound(camera_id))
    # Для упрощения примера не будем возвращать реальное видео, вернём текст
    for i in itertools.count(int(time.time() / .7)):
        yield f'<Camera {camera.id}, life video chunk {int(i)}>\n'.encode()
        await asyncio.sleep(.7)

async def watch_archive_video(camera_id: int, at: float, only_region_ids: Iterable[int] | None) -> bytes:
    raise_if((camera := _get_camera(camera_id, only_region_ids)) is None, CameraNotFound(camera_id))
    raise_if(at >= time.time(), MissingArchive(at))
    # Для упрощения примера не будем возвращать реальное видео, вернём текст
    for i in itertools.count(int(at / .7)):
        yield f'<Camera {camera.id}, archive video chunk {int(i)}>\n'.encode()
        await asyncio.sleep(.7)

@dataclass
class File:
    content: bytes
    name: str

def export_archive_video(
        camera_id: int, at: float, duration: float, only_region_ids: Iterable[int] | None
) -> File:
    raise_if((camera := _get_camera(camera_id, only_region_ids)) is None, CameraNotFound(camera_id))
    raise_if((till := at + duration) >= time.time(), MissingArchive(at + duration))
    # Для упрощения примера не будем возвращать реальное видео, вернём заглушку
    content = '\n'.join(f'<Camera {camera.id}, archive video chunk {i}>' for i in range(int(at / .7), int(till / .7)))
    to_iso = lambda x: datetime.datetime.utcfromtimestamp(x).isoformat().replace(':', '-')
    return File(content=content.encode(), name=f'cam_{camera_id}_{to_iso(at)}_{to_iso(till)}.mp4')

# Поскольку скоупы - это строки, то для удобства их стоит оформить в виде реестра строковых енумов
@enum.unique
class Scope(enum.StrEnum):
    watch_life = 'watch_life'
    watch_archive = 'watch_archive'
    export_archive = 'export_archive'

async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    exception_to_answer = PickableDict({
        CameraNotFound: ExceptionAnswer[HttpStatusCode.NotFound][ErrorCode(1)],
        MissingArchive: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
    })

    def resolver(token: str) -> tuple[None, list[str]]:
        user_info = {
            'u1': {'scopes': [Scope.watch_life], 'regions': [1]},
            'u2': {'scopes': [Scope.watch_life, Scope.watch_archive], 'regions': [2]},
            'u3': {'scopes': [Scope.watch_archive, Scope.export_archive], 'regions': [2, 3]},
        }.get(token)
        raise_if(user_info is None, Unauthorized(f'{token=} is invalid'))
        return user_info, user_info['scopes']

    regions_getter = operator.itemgetter('regions')
    # Создадим схему безопасности, требующую передачу Bearer токена в Authorisation
    auth = HttpBearerSecurityScheme[None](resolver=resolver)
    api_server.register(gen_query_endpoint(
        watch_life_video, "get", "/camera/{camera_id}/video/life/watch",
        # Укажем, что эндпоинт доступен при наличии скоупа watch_life,
        # в качестве значения аргумента only_region_ids передадим результат выполнения regions_getter
        securities=[auth.has(Scope.watch_life, only_region_ids=regions_getter)],
        # Используем StreamAnswer, поскольку эндпоинт содержит yield
        answer_type=StreamAnswer[HttpStatusCode.OK][ContentType.Mp4Video],
        exception_type_to_answer_type=exception_to_answer.pick({CameraNotFound})
    ))
    api_server.register(gen_query_endpoint(
        watch_archive_video, "get", "/camera/{camera_id}/video/archive/watch",
        # Укажем, что эндпоинт доступен при наличии скоупа watch_archive,
        # в качестве значения аргумента only_region_ids передадим результат выполнения regions_getter
        securities=[auth.has(Scope.watch_archive, only_region_ids=regions_getter)],
        # Используем StreamAnswer, поскольку эндпоинт содержит yield
        answer_type=StreamAnswer[HttpStatusCode.OK][ContentType.Mp4Video],
        exception_type_to_answer_type=exception_to_answer.pick({CameraNotFound, MissingArchive})
    ))
    api_server.register(gen_query_endpoint(
        export_archive_video, "get", "/camera/{camera_id}/video/archive/export",
        # Укажем, что эндпоинт доступен при наличии скоупа export_archive,
        # в качестве значения аргумента only_region_ids передадим результат выполнения regions_getter
        securities=[auth.has(Scope.export_archive, only_region_ids=regions_getter)],
        answer_type=FileAnswer[HttpStatusCode.OK][ContentType.Mp4Video],
        # Нужна фабрика, т.к. результат работы обработчика нельзя преобразовать в указанный тип ответа простой передачей
        answer_factory=lambda file: FileAnswer[HttpStatusCode.OK][ContentType.Mp4Video](file.content, file.name),
        exception_type_to_answer_type=exception_to_answer.pick({CameraNotFound, MissingArchive})
    ))
    async with http_server:
        await asyncio.sleep(10 ** 10)

asyncio.run(main())

Запросим живое видео с камеры 10 от имени первого пользователя, с флагом -v, чтобы увидеть детали ответа:

curl -v "http://0.0.0.0:8888/camera/10/video/life/watch" \
     -H "Authorization: Bearer u1"

Получим что-то вроде этого:
(Здесь и далее ... - любая подстрока, *** - любое количество любых строк)

***
> GET /camera/10/video/life/watch HTTP/1.1
> Host: 0.0.0.0:8888
> User-Agent: curl/...
> Accept: */*
> Authorization: Bearer u1
***
< HTTP/1.1 200 OK
< Content-Type: video/mp4
< Transfer-Encoding: chunked
< Date: ... GMT
< Server: Python/... aiohttp/...
< 
<Camera 10, life video chunk ...>
***

Поток "видео" будет идти, пока мы его не прервём.
Запросим живое видео с этой же камеры 10 от имени второго пользователя, у которого нет доступа к региону 1:

curl "http://0.0.0.0:8888/camera/10/video/life/watch" \
     -H "Authorization: Bearer u2"

В ответ получим 404 Not Found с телом:

{
  "error": "10",
  "error_type": "CameraNotFound",
  "error_code": 1,
  "done": false
}

Запросим живое видео с этой же камеры 10 от имени третьего пользователя, у которого нет скоупа watch_life:

curl "http://0.0.0.0:8888/camera/10/video/life/watch" \
     -H "Authorization: Bearer u3"

В ответ получим 403 Forbidden с телом:

{
  "error": {"http_bearer[watch_life]": "Forbidden('missing scopes: watch_life')"},
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Запросим живое видео с этой же камеры 10 от имени не существующего пользователя:

curl "http://0.0.0.0:8888/camera/10/video/life/watch" \
     -H "Authorization: Bearer u4"

В ответ получим 401 Unauthorized с телом:

{
  "error": {"http_bearer[watch_life]": "Unauthorized(\"token='u4' is invalid\")"},
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Запросим архивное видео с камеры 10 от имени первого пользователя, у которого нет скоупа watch_archive:

curl "http://0.0.0.0:8888/camera/10/video/archive/watch?at=1000000" \
     -H "Authorization: Bearer u1"

В ответ получим 403 Forbidden с телом:

{
  "error": {"http_bearer[watch_archive]": "Forbidden('missing scopes: watch_archive')"},
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Запросим архивное видео с камеры 20 от имени второго пользователя:

curl -v "http://0.0.0.0:8888/camera/20/video/archive/watch?at=1000000" \
     -H "Authorization: Bearer u2"

Получим:
(Здесь и далее ... - любая подстрока, *** - любое количество любых строк)

***
> GET /camera/20/video/archive/watch?at=1000000 HTTP/1.1
> Host: 0.0.0.0:8888
> User-Agent: curl/...
> Accept: */*
> Authorization: Bearer u2
***
< HTTP/1.1 200 OK
< Content-Type: video/mp4
< Transfer-Encoding: chunked
< Date: ...
< Server: Python/... aiohttp/...
< 
<Camera 20, archive video chunk ...>
***

Поток "видео" будет идти, пока мы его не прервём.
Запросим экспорт архива с камеры 20 от имени третьего пользователя:

curl -v "http://0.0.0.0:8888/camera/20/video/archive/export?at=1000000&duration=3" \
     -H "Authorization: Bearer u3"

Получим в ответ файл cam_20_1970-01-12T13-46-40_1970-01-12T13-47-00.mp4:
(Здесь и далее ... - любая подстрока, *** - любое количество любых строк)

***
> GET /camera/20/video/archive/export?at=1000000&duration=3 HTTP/1.1
> Host: 0.0.0.0:8888
> User-Agent: curl/...
> Accept: */*
> Authorization: Bearer u3
***
< HTTP/1.1 200 OK
< Content-Disposition: attachment; filename="cam_20_1970-01-12T13-46-40_1970-01-12T13-46-43.mp4"
< Content-Type: video/mp4
< X-Request-ID: ...
< Instance: readme
< Content-Length: ...
< Date: ...
< Server: Python/... aiohttp/...
< 
<Camera 20, archive video chunk ...>
<Camera 20, archive video chunk ...>
***

Эндпоинты с несколькими механизмами доступа

Пример

Авторизация по OIDC (например, KeyCloak)

Пример