mentortools/libs/: openapi-tools-abm-5.1.65406 metadata and description

Simple index Newer version available

author Mike Orlov
author_email m.orlov@technokert.ru
classifiers
  • Programming Language :: Python :: 3
  • Programming Language :: Python :: 3.11
description_content_type text/markdown
requires_dist
  • aiohttp (>=3.8.3,<4.0.0)
  • apispec (>=6.6.1,<7.0.0)
  • async-tools-abm (>=2.1.61556,<3.0.0)
  • dict-caster-abm (>=1.0.49813,<2.0.0)
  • furl (>=2.1.3,<3.0.0)
  • http-tools-abm (>=5.6.65372,<6.0.0)
  • init-helpers-abm (>=2.1.64481,<3.0.0)
requires_python >=3.11,<4.0
File Tox results History
openapi_tools_abm-5.1.65406-py3-none-any.whl
Size
29 KB
Type
Python Wheel
Python
3
openapi_tools_abm-5.1.65406.tar.gz
Size
40 KB
Type
Source

OpenAPI tools

Библиотека позволяет без изменений в логике публиковать её методы в OpenAPI спецификации и HTTP сервере

Примеры использования

Минимальный пример: эндпоинт без аргументов

Допустим, мы хотим опубликовать эндпоинт по получению текущего времени

import time
import asyncio
from http_tools import HttpServer, JsonableAnswer, HttpStatusCode
from openapi_tools import OpenApiServer, RpcEndpoint

# функция, которую мы хотим опубликовать
def get_now():  # сигнатура функции может быть не типизирована 
    return int(time.time())

async def main():
    # бойлерплейт
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    # Регистрация эндпоинта
    api_server.register(RpcEndpoint(get_now, "GET", "/now", [], {}, JsonableAnswer[HttpStatusCode.OK][int]))
    # запуск сервера
    async with http_server:
        await asyncio.sleep(10 ** 10)

asyncio.run(main())

Можно проверить эндпоинт с помощью запроса из консоли:

curl --location "http://127.0.0.1:8888/now"

Так же можно получить спецификацию API:

curl --location "http://127.0.0.1:8888/open_api_specification/get"

Рассмотрим подробнее, код регистрации:
api_server.register(RpcEndpoint(get_now, "GET", "/now", [], {}, JsonableAnswer[HttpStatusCode.OK][int]))

В данном примере наш ответ имеет статус код 200 OK, тип содержимого application/json и структура - один int

Пример с параметрами

В реальности у эндпоинтов почти всегда есть входные параметры.
Допустим, мы хотим опубликовать метод по извлечению корня из целого числа

import math
import asyncio
from http_tools import HttpServer, WrappedAnswer, HttpStatusCode
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter

def sqrt(value: int) -> float:
    return math.sqrt(value)

async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    api_server.register(RpcEndpoint(
        sqrt, "GET", "/sqrt/{arg}", [], {
        # ожидаем параметр 'arg' типа int в HTTP пути запроса и передаём его в аргумент 'value' обработчика  
        'value': PathParameter(name='arg', schema=int)
    }, 
        # Мы используем подход, что в теле json ответов всегда dict, содержащий метаинформацию об ответе, 
        # например, поле 'done', а данные лежат под ключом result. 
        # Всю эту логику реализует WrappedAnswer, дальше будет использовать его       
        WrappedAnswer[HttpStatusCode.OK][float]
    ))
    async with http_server:
        await asyncio.sleep(10 ** 10)

asyncio.run(main())

Можно проверить эндпоинт с помощью запроса из консоли:

curl --location "http://127.0.0.1:8888/sqrt/7"

Получим следующий ответ:

{"done": true, "result": 2.6457513110645907}

Если пошлём некорректный тип:

curl --location "http://127.0.0.1:8888/sqrt/bad"

то библиотека попробует скастить его в требуемый тип и мы получим следующий ответ:

{"error": "invalid literal for int() with base 10: 'bad'", "error_type": "ValueError", "error_code": null, "done": false}

Однако, если мы сделаем запрос с отрицательным числом

curl --location "http://127.0.0.1:8888/sqrt/-1"

то получим код 500 и довольно невнятную ошибку:

{"error": "math domain error", "error_type": "ValueError", "error_code": null, "done": false}

Это связано с тем, что мы не обрабатываем исключения, хотя они могут возникать

Пример обработки исключений

Доработаем обработку ошибок, для этого:

import math
import asyncio

from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter

def sqrt(value: int) -> float:
    # Если значение переменной value меньше 0, то будет выброшено исключение AssertionError
    assert value >= 0, f'Only non negative values allowed, got: {value}'
    return math.sqrt(value)

async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    api_server.register(RpcEndpoint(
        sqrt, "GET", "/sqrt/{arg}", [], {'value': PathParameter(name='arg', schema=int)},
        WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
        AssertionError: ExceptionAnswer[HttpStatusCode.BadRequest]
    }))
    async with http_server:
        await asyncio.sleep(10 ** 10)

asyncio.run(main())

Проверим ответ на отрицательное число:

curl --location "http://127.0.0.1:8888/sqrt/-1"

Получим код 400 и более понятную ошибку:

{"error": "Only non negative values allowed, got: -1", "error_type": "AssertionError", "error_code": null, "done": false}

Если же требуется автоматизировать обработку различных ошибок на клиенте, то в ответе стоит использовать ErrorCode

Пример использования кодов ошибок

Для демонстрации допустим, что sqrt не может вычислять корень слишком больших значений Создадим отдельные исключения и для каждого из них укажем ErrorCode

import math
import asyncio

from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, ErrorCode, WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter

class TooBigValue(ValueError):
    pass

class NegativeValue(ValueError):
    pass

def sqrt(value: int) -> float:
    if value > 100:
        raise TooBigValue(f'Only values lower than 100 allowed, got: {value}')
    if value < 0:
        raise NegativeValue(f'Only non negative values allowed, got: {value}')
    return math.sqrt(value)

async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    api_server.register(RpcEndpoint(
        sqrt, "GET", "/sqrt/{arg}", [], {'value': PathParameter(name='arg', schema=int)},
        WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
        TooBigValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
        NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
    }))
    async with http_server:
        await asyncio.sleep(10 ** 10)

asyncio.run(main())

Проверим ответ на отрицательное число:

curl --location "http://127.0.0.1:8888/sqrt/-1"

Получим код 400 и ошибку с кодом:

{"error": "Only non negative values allowed, got: -1", "error_type": "NegativeValue", "error_code": 2, "done": false}

Проверим ответ на большое число:

curl --location "http://127.0.0.1:8888/sqrt/111"

Получим код 400 и ошибку с кодом:

{"error": "Only values lower than 100 allowed, got: 111", "error_type": "TooBigValue", "error_code": 1, "done": false}

Однако, данный подход(коды ошибок указываются напрямую) может приводить к путанице при увеличении размеров API - в разных эндпоинтах один и тот же код может обозначать разное.

Пример наделения одного кода ошибки разными смыслами

Проиллюстрируем проблему, добавив ещё одну функцию бизнес-логики по возведению числа в степень, и опубликуем её в апи

import math
import asyncio

from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, ErrorCode, WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter

class TooBigValue(ValueError):
    pass

class NegativeValue(ValueError):
    pass

def sqrt(value: int) -> float:
    if value > 100:
        raise TooBigValue(f'Only values lower than 100 allowed, got: {value}')
    if value < 0:
        raise NegativeValue(f'Only non negative values allowed, got: {value}')
    return math.sqrt(value)

def power(base: float, exponent: float) -> float:
    if base < 0 and int(exponent) != exponent:
        raise NegativeValue(f'With fractional exponent({exponent}) only non negative bases allowed, got: {base}')
    return math.pow(base, exponent)


async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    api_server.register(RpcEndpoint(
        sqrt, "GET", "/sqrt/{arg}", [], {'value': PathParameter(name='arg', schema=int)},
        WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
        TooBigValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
        NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
    }))
    api_server.register(RpcEndpoint(
        power, "GET", "/pow/{base}/{exponent}", [], {
            'base': PathParameter(name='base', schema=float),
            'exponent': PathParameter(name='exponent', schema=float),
        },
        WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
        NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
    }))
    async with http_server:
        await asyncio.sleep(10 ** 10)

asyncio.run(main())

Проверим эндпоинт /math/pow:

curl "http://127.0.0.1:8888/pow/-1/-2.1"

Получим код 400 и ошибку с кодом 1:

{"error": "With fractional exponent(-2.1) only non negative bases allowed, got: -1.0", "error_type": "NegativeValue", "error_code": 1, "done": false}

При этом получается, что одна и та же проблема в разных эндпоинтах имеет разные коды, что может путать разработчиков и усложнять код клиентского приложения.
Есть два подхода для составления API, более единообразного в плане кодов ошибок:

  1. Именованный реестр кодов ошибок
  2. Единый словарь обработки исключений

Пример использования именованного реестра кодов

Создадим enum со своим перечнем кодов ошибок, тогда при использовании это будут не просто числа, а именованные параметры

import enum
import math
import asyncio

import http_tools
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter

class TooBigValue(ValueError):
    pass

class NegativeValue(ValueError):
    pass

def sqrt(value: int) -> float:
    if value > 100:
        raise TooBigValue(f'Only values lower than 100 allowed, got: {value}')
    if value < 0:
        raise NegativeValue(f'Only non negative values allowed, got: {value}')
    return math.sqrt(value)

@enum.unique
class ErrorCode(http_tools.answer.ErrorCode, enum.Enum):
    negative_value = 1
    too_big_value = 2

def power(base: float, exponent: float) -> float:
    if base < 0 and int(exponent) != exponent:
        raise NegativeValue(f'With fractional exponent({exponent}) only non negative bases allowed, got: {base}')
    return math.pow(base, exponent)

async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    api_server.register(RpcEndpoint(
        sqrt, "GET", "/sqrt/{arg}", [], {'value': PathParameter(name='arg', schema=int)},
        WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
        TooBigValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode.too_big_value],
        NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode.negative_value],
    }))
    api_server.register(RpcEndpoint(
        power, "GET", "/pow/{base}/{exponent}", [], {
            'base': PathParameter(name='base', schema=float),
            'exponent': PathParameter(name='exponent', schema=float),
        },
        WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
        NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode.negative_value],
    }))
    async with http_server:
        await asyncio.sleep(10 ** 10)

asyncio.run(main())

Такой подход требует минимальных изменений и увеличивает читаемость, но и с ним возможны ошибки - одно и то же исключение в разных эндпоинтах может отображаться на разные коды

Пример единого словаря обработки исключений

Мы можем собрать единый словарь со всеми типами исключений и выбирать из него необходимые для каждого эндпоинта
ВНИМАНИЕ, передавать этот словарь целиком при каждой регистрации обработчика НЕЛЬЗЯ Это приведёт к формированию спецификации с огромным количеством лишних ответов

import math
import typing
import asyncio

from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, ErrorCode, WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter

class TooBigValue(ValueError):
    pass

class NegativeValue(ValueError):
    pass

def sqrt(value: int) -> float:
    if value > 100:
        raise TooBigValue(f'Only values lower than 100 allowed, got: {value}')
    if value < 0:
        raise NegativeValue(f'Only non negative values allowed, got: {value}')
    return math.sqrt(value)

def power(base: float, exponent: float) -> float:
    if base < 0 and int(exponent) != exponent:
        raise NegativeValue(f'With fractional exponent({exponent}) only non negative bases allowed, got: {base}')
    return math.pow(base, exponent)

class PickableDict(dict):
    def pick(self, keys: typing.Iterable) -> dict:
        """Возвращает новый словарь только с выбранными ключами."""
        return {k: self[k] for k in keys if k in self}

async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    exception_to_answer = PickableDict({
        TooBigValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
        NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
    })
    api_server.register(RpcEndpoint(
        sqrt, "GET", "/sqrt/{arg}", [], {'value': PathParameter(name='arg', schema=int)},
        WrappedAnswer[HttpStatusCode.OK][float], exception_to_answer.pick({NegativeValue, TooBigValue})))
    api_server.register(RpcEndpoint(
        power, "GET", "/pow/{base}/{exponent}", [], {
            'base': PathParameter(name='base', schema=float),
            'exponent': PathParameter(name='exponent', schema=float),
        },
        WrappedAnswer[HttpStatusCode.OK][float], exception_to_answer.pick({NegativeValue})))
    async with http_server:
        await asyncio.sleep(10 ** 10)

asyncio.run(main())

Этот способ на данный момент является рекомендуемым при использовании кодов ошибок в ответах

Пример предобработки аргументов обработчика

В прошлых примерах можно было увидеть, что параметры запроса прямо отображаются в аргументы обработчика, но иногда требуется предобработка.
Рассмотрим пример, где мы публикуем функцию, считающую количество знаков в строке
Но перед подсчётом мы хотим удалить пробелы в начале и в конце строки
Принимать строку будет из query части запроса
НЕРАБОЧИЙ пример, пояснение ниже

import asyncio

from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, QueryParameter, CallTemplate

def get_len(value: str) -> int:
    return len(value)

async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    api_server.register(RpcEndpoint(
        get_len, "GET", "/len", [], {'value': QueryParameter(name='value', schema=str)},
        WrappedAnswer[HttpStatusCode.OK][int]
    ))
    api_server.register(RpcEndpoint(
        get_len, "GET", "/len/strip", [], {'value': CallTemplate(str.strip, QueryParameter(name='value', schema=str))},
        WrappedAnswer[HttpStatusCode.OK][int]
    ))
    async with http_server:
        await asyncio.sleep(10 ** 10)

asyncio.run(main())

Это НЕРАБОЧИЙ пример: каждый эндпоинт имеет "имя"(operationId) и это имя должно быть уникальным на сервере.
По дефолту в качестве имени используется __name__ обработчика.
Поэтому, если один обработчик требуется зарегистрировать более одного раза, то требуется явно указывать его operation_id.
Перепишем пример:

import asyncio

from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, QueryParameter, CallTemplate

def get_len(value: str) -> int:
    return len(value)

async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    api_server.register(RpcEndpoint(
        get_len, "GET", "/len", [], {'value': QueryParameter(name='value', schema=str)},
        WrappedAnswer[HttpStatusCode.OK][int]
    ))
    api_server.register(RpcEndpoint(
        get_len, "GET", "/len/strip", [], {'value': CallTemplate(str.strip, QueryParameter(name='value', schema=str))},
        WrappedAnswer[HttpStatusCode.OK][int], operation_id='get_len__stripped'
    ))
    async with http_server:
        await asyncio.sleep(10 ** 10)

asyncio.run(main())

Проверим эндпоинт /len (пробел в урле обозначается символом '+'):

curl "http://127.0.0.1:8888/len?value=+test"

Получим {"done": true, "result": 5}
Проверим эндпоинт /len/strip:

curl "http://127.0.0.1:8888/len/strip?value=+test"

Получим {"done": true, "result": 4}

Пример более сложной предобработки аргументов обработчика

Рассмотрим пример, когда нам нужно опубликовать эндпоинт по расчёту расстояния между двумя точками (датаклассами) Датаклассы можно создать из json строки, переданной в query части запроса, но это преобразование нужно описывать явно
Или можем принимать их из json тела

import json
import dataclasses
import asyncio

from init_helpers.dict_to_dataclass import dict_to_dataclass
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, QueryParameter, CallTemplate, JsonBodyParameter

@dataclasses.dataclass
class Point:
    x: int
    y: int

def get_dist(first: Point, second: Point) -> float:
    return ((first.x - second.x) ** 2 + (first.y - second.y) ** 2) ** 0.5

def parse_json_as_point(value: str) -> Point:
    return dict_to_dataclass(json.loads(value), Point)

async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    api_server.register(RpcEndpoint(
        get_dist, "GET", "/dist", [], {
            'first': CallTemplate(parse_json_as_point, QueryParameter(name='first', schema=str)),
            'second': CallTemplate(parse_json_as_point, QueryParameter(name='second', schema=str)),
        }, WrappedAnswer[HttpStatusCode.OK][float],
    ))
    api_server.register(RpcEndpoint(
        get_dist, "GET", "/dist/body", [], {
            'first': JsonBodyParameter(name='first', schema=Point),
            'second': JsonBodyParameter(name='second', schema=Point),
        }, WrappedAnswer[HttpStatusCode.OK][float], operation_id='get_dist_body'
    ))
    async with http_server:
        await asyncio.sleep(10 ** 10)

asyncio.run(main())

Проверим эндпоинт /dist:

curl -X GET --location "http://127.0.0.1:8888/dist?first=%7B%22x%22%3A1%2C%22y%22%3A2%7D&second=%7B%22x%22%3A3%2C%22y%22%3A4%7D"

Проверим эндпоинт /dist/body:

curl -X GET --location "http://127.0.0.1:8888/dist/body" \
    -H "Content-Type: application/json" \
    -d '{"first":{"x":1,"y":2},"second":{"x":3,"y":4}}'

Ответ одинаковый для обоих запросов: {"done": true, "result": 2.8284271247461903}

Пример автогенерации парамтеров запроса и типа ответа

В прошлых примерах можно заметить, что если не используется предобработка, то параметры, указываемые при регистрации повторяют имена и типы аргументов обработчика, а ответ стоится из возвращаемого типа
Для сокращения кода есть функции, генерирующие объект RpcEndpoint на основе сигнатуры обработчика
Используем эндпоинты без предобработки из прошлых примеров (/len для расчёта длины строки и /dist для расстояния между точками)

import dataclasses
import asyncio

from http_tools import HttpServer
from openapi_tools import OpenApiServer, gen_json_rpc_endpoint, gen_query_rpc_endpoint

@dataclasses.dataclass
class Point:
    x: int
    y: int

def get_dist(first: Point, second: Point) -> float:
    return ((first.x - second.x) ** 2 + (first.y - second.y) ** 2) ** 0.5

def get_len(value: str) -> int:
    return len(value)

async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    api_server.register(gen_json_rpc_endpoint(get_dist, "GET", "/dist", []))
    api_server.register(gen_query_rpc_endpoint(get_len, "GET", "/len", []))
    async with http_server:
        await asyncio.sleep(10 ** 10)

asyncio.run(main())

Проверим эндпоинт /dist:

curl -X GET --location "http://127.0.0.1:8888/dist" \
    -H "Content-Type: application/json" \
    -d '{"first":{"x":1,"y":2},"second":{"x":3,"y":4}}'

Проверим эндпоинт /len:

curl "http://127.0.0.1:8888/len?value=test"

Аутентификация и авторизация

Базовая аутентификация

Во всех прошлых примерах можно заметить, что при генерации эндпоинта передаётся пустой массив.
Это список объектов, описывающих проверку прав доступа.
Чтобы ограничить доступ к какому-то эндпоинту достаточно при регистрации указать SecurityScheme или Security.
Грубо можно сказать, что SecurityScheme описывает аутентификацию: как проверяется секрет (токен/ключ), а Security - авторизацию: как проверяются права на вызов эндпоинта.

Рассмотрим простейший пример - аутентификацию по фиксированному секрету

import math
import asyncio
from typing import Tuple
from init_helpers import raise_if
from http_tools import HttpServer
from openapi_tools import OpenApiServer, ApiKeySecurityScheme, gen_query_rpc_endpoint, ParameterLocation, Unauthorized 

def sqrt(value: int) -> float:
    return math.sqrt(value)
    
async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    # Опишем resolver (способ проверки токена). Он должен возвращать информацию о пользователе и список его прав
    # Для простоты захардкодим токен, будем возвращать None в качестве информации о пользователе и пустой набор прав
    def resolver(token: str) -> Tuple[None,  list[str]]:
        raise_if(token != 'secret', Unauthorized(f'{token=} is invalid'))
        return None, []
    
    # Создадим схему безопасности, требующую передачу токена в query части запроса под именем 'name', применим resolver  
    api_key_scheme = ApiKeySecurityScheme[None](location=ParameterLocation.query, name='token', resolver=resolver)
    # Регистрируем эндпоинт с этой схемой
    api_server.register(gen_query_rpc_endpoint(sqrt, "get", "/sqrt", [api_key_scheme],))
    async with http_server:
        await asyncio.sleep(10 ** 10)

asyncio.run(main())

Делаем запрос без токена

curl "http://127.0.0.1:8888/sqrt?value=4"

Получаем в ответ 401 Unauthorized с телом. В теле описано, какие варианты авторизации были проверены(api_key_in_query_token[]) и почему не подошли(Unauthorized()):

{
  "error": {
    "api_key_in_query_token[]": "Unauthorized()"
  }, 
  "error_type": "ErrorGroup", 
  "error_code": null, 
  "done": false
}

Сделаем запрос с неверным токеном

curl "http://127.0.0.1:8888/sqrt?value=4&token=qqq"

Получаем в ответ 401 Unauthorized. В теле видим исключение из resolver'а(Unauthorized("token='qqq' is invalid")):

{
  "error": {
    "api_key_in_query_token[]": "Unauthorized(\"token='qqq' is invalid\")"
  },
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Сделаем запрос с верным токеном

curl "http://127.0.0.1:8888/sqrt?value=4&token=secret"

Получим 200 OK с телом {"done": true, "result": 2.0}

Контроль доступа по скоупам (RBAC)

Часто аутентифицированные пользователи имеют разные набор полномочий и могут вызывать разный набор эндпоинтов.
Для этого в спецификации OpenAPI предусмотрена возможность каждому эндпоинту указать необходимый набор скоупов(прав).
Рассмотрим пример простой очереди задач:

import asyncio
from typing import Tuple
from http_tools import HttpServer
from init_helpers import raise_if
from openapi_tools import OpenApiServer, gen_json_rpc_endpoint
from openapi_tools import HttpBearerSecurityScheme, Unauthorized

tasks_query = []

def add_task(target: int) -> list[int]:
    tasks_query.append(target)
    return tasks_query

def pop_task() -> int | None:
    if tasks_query:
        return tasks_query.pop(0)

def reset_task() -> list[int]:
    if value := pop_task():
        add_task(value)
    return tasks_query

async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))

    # Опишем resolver. Он должен возвращать информацию о пользователе и список его прав.
    # Будем возвращать None в качестве информации о пользователе, а набор прав брать из словаря
    def resolver(token: str) -> Tuple[None, list[str]]:
        scopes = {'user_token': ['add'], 'worker_token': ['pop'], 'admin_token': ['add', 'pop']}.get(token)
        raise_if(scopes is None, Unauthorized(f'{token=} is invalid'))
        return None, scopes

    # Создадим схему безопасности, на основе HTTP bearer авторизации(передачи токена в хедере Authorisation)  
    http_auth = HttpBearerSecurityScheme[None](resolver=resolver)
    # Регистрируем эндпоинты с этой схемой, указывая требуемые скоупы
    api_server.register(gen_json_rpc_endpoint(add_task, "POST", "/task/add", [http_auth.has('add')]))
    api_server.register(gen_json_rpc_endpoint(pop_task, "POST", "/task/pop", [http_auth.has('pop')]))
    api_server.register(gen_json_rpc_endpoint(reset_task, "post", "/task/reset", [http_auth.has('add', 'pop')]))
    async with http_server:
        await asyncio.sleep(10 ** 10)


asyncio.run(main())

Добавляем задачу от пользователя

curl -X POST --location "http://127.0.0.1:8888/task/add" \
    -H "Authorization: Bearer user_token" -H "Content-Type: application/json" \
    -d '{"target": 42}'

Получим 200 OK: {"done": true, "result": [42]}
Попробуем от имени этого же пользователя извлечь задачу

curl -X POST --location "http://127.0.0.1:8888/task/pop" \
    -H "Authorization: Bearer user_token" 

Получим 403 с указанием, что проверен один вариант авторизации(http_Bearer требующий права pop). Он сообщает, что авторизация провалена(Forbidden)(т.е. аутентификация пройдена), т.к. не хватило прав(missing scopes), а именно: pop:

{
  "error": {
    "http_Bearer[pop]": "Forbidden('missing scopes: pop')"
  },
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Запросим задачу от имени воркера:

curl -X POST --location "http://127.0.0.1:8888/task/pop" \
    -H "Authorization: Bearer worker_token" 

Получим 200 OK: {"done": true, "result": 42}
Добавим три задачи от админа

curl -X POST --location "http://127.0.0.1:8888/task/add" \
    -H "Authorization: Bearer admin_token" -H "Content-Type: application/json" \
    -d '{"target": 1}'
curl -X POST --location "http://127.0.0.1:8888/task/add" \
    -H "Authorization: Bearer admin_token" -H "Content-Type: application/json" \
    -d '{"target": 2}'
curl -X POST --location "http://127.0.0.1:8888/task/add" \
    -H "Authorization: Bearer admin_token" -H "Content-Type: application/json" \
    -d '{"target": 3}'

На последний запрос получим 200 OK: {"done": true, "result": [1, 2, 3]}
А теперь от имени админа сбросим первую задачу в конец списка

curl -X POST --location "http://127.0.0.1:8888/task/reset" \
    -H "Authorization: Bearer admin_token"

Получим 200 OK: {"done": true, "result": [2, 3, 1]} Попробуем сделать это от имени пользователя:

curl -X POST --location "http://127.0.0.1:8888/task/reset" \
    -H "Authorization: Bearer user_token"

Получим 403 с указанием, что проверен один вариант авторизации(http_Bearer требующий прав add и pop). Он сообщает, что авторизация провалена(Forbidden)(т.е. аутентификация пройдена), т.к. не хватило прав(missing scopes), а именно: pop:

{
  "error": {
    "http_Bearer[add,pop]": "Forbidden('missing scopes: pop')"
  },
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Попробуем сделать это от имени воркера:

curl -X POST --location "http://127.0.0.1:8888/task/reset" \
    -H "Authorization: Bearer worker_token"

Получим 403 с указанием, что проверен один вариант авторизации(http_Bearer требующий прав add и pop). Он сообщает, что авторизация провалена(Forbidden)(т.е. аутентификация пройдена), т.к. не хватило прав(missing scopes), а именно: add:

{
  "error": {
    "http_Bearer[add,pop]": "Forbidden('missing scopes: add')"
  },
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Контроль доступа по атрибутам пользователей и объектов (ABAC)

Часто в проектах требуется ограничение не только на уровне доступа до эндпоинтов, но и на уровне ограничения действий или объектов, обрабатываемых эндпоинтами
Рассмотрим пример по мотивам системы доступа к камерам БР:

import asyncio
import datetime
import enum
import itertools
import operator
import time
from dataclasses import dataclass
from typing import Iterable, Tuple

from http_tools import HttpServer, HttpStatusCode, StreamAnswer, FileAnswer
from http_tools.answer import ExceptionAnswer, ErrorCode
from http_tools.mime_types import ContentType
from init_helpers import raise_if

from openapi_tools import OpenApiServer, HttpBearerSecurityScheme, gen_query_rpc_endpoint, Unauthorized, PickableDict


# опишем объект доступа
@dataclass
class Camera:
    id: int
    region_id: int


cameras = [Camera(10, 1), Camera(20, 2), Camera(21, 2), Camera(30, 3), Camera(31, 3), Camera(32, 3)]


# опишем исключения
@dataclass
class CameraNotFound(Exception):
    camera_id: int


@dataclass
class MissingArchive(Exception):
    at: float


# Вспомогательная функция, т.к. всем эндпоинтам нужна камера по id с ограничением по доступным пользователю регионам
def _get_camera(camera_id: int, only_region_ids: Iterable[int] | None) -> Camera | None:
    only_cameras = cameras
    if only_region_ids is not None:
        only_region_ids = frozenset(only_region_ids)
        only_cameras = [c for c in cameras if c.region_id in only_region_ids]
    return next(iter(filter(lambda cam: cam.id == camera_id, only_cameras)), None)


async def watch_life_video(camera_id: int, only_region_ids: Iterable[int] | None) -> bytes:
    raise_if((camera := _get_camera(camera_id, only_region_ids)) is None, CameraNotFound(camera_id))
    # Для упрощения примера не будем возвращать реальное видео, вернём текст
    for i in itertools.count(int(time.time() / 5)):
        yield f'<Camera {camera.id}, chunk {int(i)} of life video>\n'.encode()
        await asyncio.sleep(5)


async def watch_archive_video(camera_id: int, at: float, only_region_ids: Iterable[int] | None) -> bytes:
    raise_if((camera := _get_camera(camera_id, only_region_ids)) is None, CameraNotFound(camera_id))
    raise_if(at >= time.time(), MissingArchive(at))
    # Для упрощения примера не будем возвращать реальное видео, вернём текст
    for i in itertools.count(int(at / 5)):
        yield f'<Camera {camera.id}, chunk {int(i)} of archive video>\n'.encode()
        await asyncio.sleep(5)


@dataclass
class File:
    content: bytes
    name: str


def export_archive_video(
        camera_id: int, at: float, duration: float, only_region_ids: Iterable[int] | None
) -> File:
    raise_if((camera := _get_camera(camera_id, only_region_ids)) is None, CameraNotFound(camera_id))
    raise_if((till := at + duration) >= time.time(), MissingArchive(at + duration))
    # Для упрощения примера не будем возвращать реальное видео, вернём заглушку
    content = '\n'.join(f'<Camera {camera.id}, chunk {i} of archive video>' for i in range(int(at / 5), int(till / 5)))
    to_iso = lambda x: datetime.datetime.fromtimestamp(x).isoformat().replace(':', '-')
    return File(content=content.encode(), name=f'cam_{camera_id}_{to_iso(at)}_{to_iso(till)}.mp4')


# Поскольку скоупы - это строки, то для удобства их стоит оформить в виде реестра строковых енумов
@enum.unique
class Scope(enum.StrEnum):
    watch_life = 'watch_life'
    watch_archive = 'watch_archive'
    export_archive = 'export_archive'


async def main():
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    exception_to_answer = PickableDict({
        CameraNotFound: ExceptionAnswer[HttpStatusCode.NotFound][ErrorCode(1)],
        MissingArchive: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
    })

    def resolver(token: str) -> Tuple[None, list[str]]:
        user_info = {
            'u1': {'scopes': [Scope.watch_life], 'regions': [1]},
            'u2': {'scopes': [Scope.watch_life, Scope.watch_archive], 'regions': [2]},
            'u3': {'scopes': [Scope.watch_archive, Scope.export_archive], 'regions': [2, 3]},
        }.get(token)
        raise_if(user_info is None, Unauthorized(f'{token=} is invalid'))
        return user_info, user_info['scopes']

    regions_getter = operator.itemgetter('regions')
    # Создадим схему безопасности, требующую передачу Bearer токена в Authorisation
    auth = HttpBearerSecurityScheme[None](resolver=resolver)
    api_server.register(gen_query_rpc_endpoint(
        watch_life_video, "get", "/camera/{camera_id}/video/life/watch",
        # Укажем, что эндпоинт доступен при наличии скоупа watch_life,
        # в качестве значения аргумента only_region_ids передадим результат выполнения regions_getter
        securities=[auth.has(Scope.watch_life, only_region_ids=regions_getter)],
        # Используем StreamAnswer, поскольку эндпоинт содержит yield
        answer_type=StreamAnswer[HttpStatusCode.OK][ContentType.Mp4Video],
        exception_type_to_answer_type=exception_to_answer.pick({CameraNotFound})
    ))
    api_server.register(gen_query_rpc_endpoint(
        watch_archive_video, "get", "/camera/{camera_id}/video/archive/watch",
        # Укажем, что эндпоинт доступен при наличии скоупа watch_archive,
        # в качестве значения аргумента only_region_ids передадим результат выполнения regions_getter
        securities=[auth.has(Scope.watch_archive, only_region_ids=regions_getter)],
        # Используем StreamAnswer, поскольку эндпоинт содержит yield
        answer_type=StreamAnswer[HttpStatusCode.OK][ContentType.Mp4Video],
        exception_type_to_answer_type=exception_to_answer.pick({CameraNotFound, MissingArchive})
    ))
    api_server.register(gen_query_rpc_endpoint(
        export_archive_video, "get", "/camera/{camera_id}/video/archive/export",
        # Укажем, что эндпоинт доступен при наличии скоупа export_archive,
        # в качестве значения аргумента only_region_ids передадим результат выполнения regions_getter
        securities=[auth.has(Scope.export_archive, only_region_ids=regions_getter)],
        answer_type=FileAnswer[HttpStatusCode.OK][ContentType.Mp4Video],
        # Нужна фабрика, т.к. результат работы обработчика нельзя преобразовать в указанный тип ответа простой передачей
        answer_factory=lambda file: FileAnswer[HttpStatusCode.OK][ContentType.Mp4Video](file.content, file.name),
        exception_type_to_answer_type=exception_to_answer.pick({CameraNotFound, MissingArchive})
    ))
    async with http_server:
        await asyncio.sleep(10 ** 10)


asyncio.run(main())

Запросим живое видео с камеры 10 от имени первого пользователя, с флагом -v, чтобы увидеть детали ответа:

curl -v "http://127.0.0.1:8888/camera/10/video/life/watch" \
     -H "Authorization: Bearer u1"

Получим что-то вроде этого:

*   Trying 127.0.0.1:8888...
* Connected to 127.0.0.1 (127.0.0.1) port 8888 (#0)
> GET /camera/10/video/life/watch HTTP/1.1
> Host: 127.0.0.1:8888
> User-Agent: curl/7.88.1
> Accept: */*
> Authorization: Bearer u1
> 
< HTTP/1.1 200 OK
< Content-Type: video/mp4
< Transfer-Encoding: chunked
< Date: Tue, 08 Oct 2024 08:03:30 GMT
< Server: Python/3.11 aiohttp/3.10.5
< 
<Camera 10, chunk 345674922 of life video>
<Camera 10, chunk 345674923 of life video>
<Camera 10, chunk 345674924 of life video>
<Camera 10, chunk 345674925 of life video>
^C

Поток "видео" будет идти, пока мы его не прервём.
Запросим живое видео с этой же камеры 10 от имени второго пользователя, у которого нет доступа к региону 1:

curl -v "http://127.0.0.1:8888/camera/10/video/life/watch" \
     -H "Authorization: Bearer u3"

В ответ получим 404 Not Found с телом:

{
  "error": "10",
  "error_type": "CameraNotFound",
  "error_code": 1,
  "done": false
}

Запросим живое видео с этой же камеры 10 от имени третьего пользователя, у которого нет скоупа watch_life:

curl -v "http://127.0.0.1:8888/camera/10/video/life/watch" \
     -H "Authorization: Bearer u3"

В ответ получим 403 Forbidden с телом:

{
  "error": {"http_Bearer[watch_life]": "Forbidden('missing scopes: watch_life')"},
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Запросим живое видео с этой же камеры 10 от имени не существующего пользователя:

curl -v "http://127.0.0.1:8888/camera/10/video/life/watch" \
     -H "Authorization: Bearer u4"

В ответ получим 401 Unauthorized с телом:

{
  "error": {"http_Bearer[watch_life]": "Unauthorized(\"token='u4' is invalid\")"},
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Запросим архивное видео с камеры 10 от имени первого пользователя, у которого нет скоупа watch_archive:

curl -v "http://127.0.0.1:8888/camera/10/video/archive/watch?at=1000000" \
     -H "Authorization: Bearer u1"

В ответ получим 403 Forbidden с телом:

{
  "error": {"http_Bearer[watch_archive]": "Forbidden('missing scopes: watch_archive')"},
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Запросим архивное видео с камеры 20 от имени второго пользователя:

curl -v "http://127.0.0.1:8888/camera/20/video/archive/watch?at=1000000" \
     -H "Authorization: Bearer u2"

Получим:

* Connected to 127.0.0.1 (127.0.0.1) port 8888 (#0)
> GET /camera/20/video/archive/watch?at=1000000 HTTP/1.1
> Host: 127.0.0.1:8888
> User-Agent: curl/7.88.1
> Accept: */*
> Authorization: Bearer u2
> 
< HTTP/1.1 200 OK
< Content-Type: video/mp4
< Transfer-Encoding: chunked
< Date: Tue, 08 Oct 2024 08:49:51 GMT
< Server: Python/3.11 aiohttp/3.10.5
< 
<Camera 20, chunk 200000 of archive video>
<Camera 20, chunk 200001 of archive video>
^C

Поток "видео" будет идти, пока мы его не прервём.
Запросим экспорт архива с камеры 20 от имени третьего пользователя:

curl -v "http://127.0.0.1:8888/camera/20/video/archive/export?at=1000000&duration=20" \
     -H "Authorization: Bearer u3"

Получим в ответ файл cam_20_1970-01-12T16-46-40_1970-01-12T16-47-00.mp4:

*   Trying 127.0.0.1:8888...
* Connected to 127.0.0.1 (127.0.0.1) port 8888 (#0)
> GET /camera/20/video/archive/export?at=1000000&duration=20 HTTP/1.1
> Host: 127.0.0.1:8888
> User-Agent: curl/7.88.1
> Accept: */*
> Authorization: Bearer u3
> 
< HTTP/1.1 200 OK
< Content-Disposition: attachment; filename="cam_20_1970-01-12T16-46-40_1970-01-12T16-47-00.mp4"
< Content-Type: video/mp4
< X-Request-ID: a3a8f3c0a020633d965cd99903877e87-127.0.0.1-1728379985.1459343
< Instance: readme
< Content-Length: 171
< Date: Tue, 08 Oct 2024 09:33:05 GMT
< Server: Python/3.11 aiohttp/3.10.5
< 
<Camera 20, chunk 200000 of archive video>
<Camera 20, chunk 200001 of archive video>
<Camera 20, chunk 200002 of archive video>
<Camera 20, chunk 200003 of archive video>
* Connection #0 to host 127.0.0.1 left intact

Эндпоинты с несколькими механизмами доступа

Бывает, что один и тот же эндпоинт должен накладывать разные ограничения или даже не накладывать никаких, в зависимости от аутентификационной и/или авторизационной информации.
Например, если пользователи могут работать со "своими" ресурсами и есть админ, имеющий доступ ко всем ресурсам.
Или если к этому сервису могут обращаться другие сервисы.
Рассмотрим следующий пример:

ВНИМАНИЕ! Порядок объектов в security ВАЖЕН: будет использоваться ПЕРВЫЙ подходящий

import asyncio
import contextlib
import csv
import enum
import io
import operator
from typing import Literal, Any, TypedDict, Tuple, TypeAlias

import sqlalchemy
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, FileAnswer, WrappedAnswer
from http_tools.mime_types import ContentType
from init_helpers import raise_if
from sqlalchemy import select, Integer, String
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, Mapped, mapped_column

from openapi_tools import ApiKeySecurityScheme, ParameterLocation, OpenApiServer, gen_query_rpc_endpoint, \
    gen_json_rpc_endpoint, RpcEndpoint, Unauthorized, RawBodyParameter, PickableDict

# Реестр сущностей SQLAlchemy
sqlalchemy_mapper_registry = sqlalchemy.orm.registry()
# Алиас для не гарантированного объекта сессии (будет понятно дальше)
MaybeSession: TypeAlias = AsyncSession | None

# Опишем сущность заметки
@sqlalchemy_mapper_registry.mapped_as_dataclass
class Note:
    __tablename__ = 'notes'
    id: Mapped[int | None] = mapped_column(Integer, primary_key=True, init=False)
    content: Mapped[str] = mapped_column(String, nullable=False)
    author_id: Mapped[int | None] = mapped_column(Integer, nullable=True, index=True)

# helper для удобства переиспользования методов (будет видно ниже)
@contextlib.asynccontextmanager
async def ignore_async_with(value: Any):
    yield value

class Controller:
    def __init__(self, session_maker: sessionmaker):
        self.session_maker = session_maker

    # Метод для возвращения сессии БД, при получении существующей сессии, оборачивает её в helper для скипа async with
    def ensure_session(self, session: MaybeSession = None) -> AsyncSession:
        return self.session_maker() if session is None else ignore_async_with(session)

    # Здесь и далее аргумент session позволяет управлять, создаётся объект в новой сессии БД или в существующей
    async def create_note(self, content: str, author_id: int | None, session: MaybeSession = None) -> Note:
        async with self.ensure_session(session) as session:
            session.add(new_note := Note(content=content, author_id=author_id))
            await session.commit()
            return new_note

    # Здесь и далее аргумент only_author_id используется для ограничения доступа
    async def get_notes(self, only_author_id: int | None, limit: int | None = 100, offset: int = 0) -> list[Note]:
        async with self.ensure_session() as session:
            query = select(Note).limit(limit).offset(offset)
            query = query if only_author_id is None else query.where(Note.author_id == only_author_id)
            return list((await session.execute(query)).scalars())

    async def export_notes(self, only_author_id: int) -> Tuple[bytes, str]:
        notes = await self.get_notes(only_author_id=only_author_id, limit=None, offset=0)
        writer = csv.writer(output := io.StringIO())
        writer.writerow(['id', 'content', 'author_id'])
        [writer.writerow([note.id, note.content, note.author_id]) for note in notes]
        return output.getvalue().encode(), 'notes.csv'

    async def get_note(self, note_id: int, only_author_id: int | None, session: MaybeSession = None) -> Note:
        query = select(Note).where(Note.id == note_id)
        query = query if only_author_id is None else query.where(Note.author_id == only_author_id)
        async with self.ensure_session(session) as session:
            return (await session.execute(query)).scalar_one()

    async def copy_note(self, note_id: int, only_author_id: int | None, author_id: int | None) -> Note:
        async with self.ensure_session() as session:
            old_note = await self.get_note(note_id=note_id, only_author_id=only_author_id, session=session)
            new_note = await self.create_note(content=old_note.content, author_id=author_id, session=session)
            await session.commit()
            return new_note

    async def update_note(self, note_id: int, content: str, only_author_id: int | None) -> Note:
        async with self.ensure_session() as session:
            # используется метод, описанный выше
            note = await self.get_note(note_id=note_id, only_author_id=only_author_id, session=session)
            note.content = content
            await session.commit()
            return note

    async def delete_note(self, note_id: int, only_author_id: int | None) -> None:
        async with self.ensure_session() as session:
            note = await self.get_note(note_id=note_id, only_author_id=only_author_id, session=session)
            await session.delete(note)
            await session.commit()

    async def import_notes(self, csv_file: bytes, author_id: int | None) -> list[int]:
        # Не хочу писать реальный парсер, буду вне зависимости от содержимого создавать одну заметку
        async with self.ensure_session() as session:
            new_note = await self.create_note(content=str(len(csv_file)), author_id=author_id, session=session)
            await session.commit()
        return [new_note.id]

async def main():
    engine = create_async_engine("sqlite+aiosqlite:///./test.db", echo=True)
    async with engine.begin() as conn:
        await conn.run_sync(sqlalchemy_mapper_registry.metadata.create_all)
    http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
    api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
    # noinspection PyTypeChecker
    controller = Controller(sessionmaker(engine, expire_on_commit=False, class_=AsyncSession))

    # Опишем обработку исключений
    exception_to_answer = PickableDict({
        sqlalchemy.exc.NoResultFound: ExceptionAnswer[HttpStatusCode.NotFound],
    })

    # Опишем скоупы в виде enum
    class Scope(enum.StrEnum):
        create = 'create'  # право на создание
        read_own = 'read_own'  # право на чтение своих заметок
        read_any = 'read_any'  # право на чтение любых заметок
        update_own = 'update_own'  # право на модификацию своих заметок
        update_any = 'update_any'  # право на модификацию любых заметок
        delete_own = 'delete_own'  # право на удаление своих заметок
        delete_any = 'delete_any'  # право на удаление любых заметок
        other = 'other'  # иное право(не испольузется)

    # Опишем структуру информации о пользователе, получаемую при аутентификации по токену
    class UserInfo(TypedDict):
        user_id: int
        scope: list[str]

    # Описываем callable для получения id пользователя из UserInfo
    get_user_id = operator.itemgetter('user_id')

    # Опишем структуру информации о пользователе, получаемую при аутентификации с помощью хедера
    class EmptyDict(TypedDict):
        pass

    # функция, подменяющая внешнюю авторизационную систему в нашем примере
    # сигнатура всегда такая - возвращает кортеж из информации о пользователе и доступных скоупах
    def resolver(token: str) -> tuple[UserInfo, list[Scope]]:
        auth_info = {
            'first_token': {'user_id': 1, 'scope': [Scope.create, Scope.read_own, Scope.update_own, Scope.delete_own]},
            'second_token': {'user_id': 2, 'scope': [Scope.create, Scope.read_own, Scope.update_own, Scope.delete_own]},
            'admin_token': {'user_id': 3, 'scope': [Scope.create, Scope.read_any, Scope.update_any, Scope.delete_any]},
            'other_token': {'user_id': 5, 'scope': [Scope.other]},
            'reader_token': {'user_id': 6, 'scope': [Scope.read_any]},
        }.get(token)
        raise_if(auth_info is None, Unauthorized(f'{token=} is invalid'))
        return auth_info, auth_info['scope']

    # Опишем основную схему авторизации: через токен в query
    query_auth = ApiKeySecurityScheme[UserInfo](location=ParameterLocation.query, name='token', resolver=resolver)

    # Опишем дополнительную схему авторизации через хедер.
    # Мы не хотим сообщать о её наличии, поэтому добавляем do_log=False
    header_auth = ApiKeySecurityScheme[EmptyDict](
        location=ParameterLocation.header, name='server_name', resolver=lambda x: ({}, list[Scope]), do_log=False)

    # Зарегистрируем метод создания заметки
    api_server.register(gen_json_rpc_endpoint(
        controller.create_note, "POST", "/notes",
        # Все security обязаны иметь одинаковый перечень ключевых аргументов, даже если у обработчика есть дефолт
        securities=[
            # При обработке запросов, аутентифицированных через хедер, в значении аргумента author_id используем None
            header_auth.has(author_id=Literal[None]),
            # При обработке запросов, аутентифицированных через query, в значении аргумента author_id используем user_id
            query_auth.has(Scope.create, author_id=get_user_id)
        ], arg_name_to_param={
            'session': None  # При автогенерации нужно указать, что мы не хотим публиковать аргумент session
        }
    ))

    api_server.register(gen_query_rpc_endpoint(
        controller.get_notes, "GET", "/notes", securities=[
            # При аутентификации через хедер, передаём None в only_author_id, чем снимаем ограничения на видимые заметки
            header_auth.has(only_author_id=Literal[None]),
            # При аутентификации через query, если есть скоуп read_any, то тоже передаём None в only_author_id
            query_auth.has(Scope.read_any, only_author_id=Literal[None]),
            # При аутентификации через query, если прошлый метод авторизации не сработал, т.е. нет скоупа read_any,
            #   то передаём значение user_id в only_author_id, тем самым ограничивая видимые заметки
            query_auth.has(Scope.read_own, only_author_id=get_user_id),
        ], exception_type_to_answer_type={}
    ))

    # Серверам и юзерам с read_any доступны для чтения все заметки, с read_own - только свои
    api_server.register(gen_query_rpc_endpoint(
        controller.get_note, "GET", "/notes/{note_id}", securities=[
            header_auth.has(only_author_id=Literal[None]),
            query_auth.has(Scope.read_any, only_author_id=Literal[None]),
            query_auth.has(Scope.read_own, only_author_id=get_user_id),
        ], arg_name_to_param={'session': None},
        exception_type_to_answer_type=exception_to_answer.pick({sqlalchemy.exc.NoResultFound})
    ))

    # Серверам и юзерам с read_any доступны для чтения все заметки, с read_own - только свои
    api_server.register(RpcEndpoint(
        controller.export_notes, "GET", "/notes/export", securities=[
            header_auth.has(only_author_id=Literal[None]),
            query_auth.has(Scope.read_any, only_author_id=Literal[None]),
            query_auth.has(Scope.read_own, only_author_id=get_user_id),
        ], arg_name_to_param={},
        answer_type=FileAnswer[HttpStatusCode.OK][ContentType.CSV],
        # Нужно указать answer_factory, т.к. класс FileAnswer.__init__ не принимает напрямую Tuple[Bytes, str]
        answer_factory=lambda x: FileAnswer[HttpStatusCode.OK][ContentType.CSV](payload=x[0], file_name=x[1]),
        exception_type_to_answer_type=exception_to_answer.pick({sqlalchemy.exc.NoResultFound})
    ))

    # Сервера создают заметки без авторства, юзеры с create - со своим авторством
    api_server.register(RpcEndpoint(
        controller.import_notes, "POST", "/notes/import", securities=[
            header_auth.has(author_id=Literal[None]),
            query_auth.has(Scope.create, author_id=get_user_id),
        ], arg_name_to_param={'csv_file': RawBodyParameter[ContentType.CSV]},
        answer_type=WrappedAnswer[HttpStatusCode.OK][list[int]],
        exception_type_to_answer_type=exception_to_answer.pick({sqlalchemy.exc.NoResultFound})
    ))

    # Серверам и юзерам с update_any доступны для изменения все заметки, с update_own - только свои
    api_server.register(gen_json_rpc_endpoint(
        controller.update_note, "PUT", "/notes/{note_id}", securities=[
            header_auth.has(only_author_id=Literal[None]),
            query_auth.has(Scope.update_any, only_author_id=Literal[None]),
            query_auth.has(Scope.update_own, only_author_id=get_user_id),
        ],
        exception_type_to_answer_type=exception_to_answer.pick({sqlalchemy.exc.NoResultFound})
    ))

    # Серверам и юзерам с delete_any доступны для удаления все заметки, с delete_own - только свои
    api_server.register(gen_query_rpc_endpoint(
        controller.delete_note, "DELETE", "/notes/{note_id}", securities=[
            header_auth.has(only_author_id=Literal[None]),
            query_auth.has(Scope.delete_any, only_author_id=Literal[None]),
            query_auth.has(Scope.delete_own, only_author_id=get_user_id),
        ],
        exception_type_to_answer_type=exception_to_answer.pick({sqlalchemy.exc.NoResultFound})
    ))

    # Серверам и юзерам с read_any&create доступны для копирования все заметки, с read_own&create - только свои
    api_server.register(gen_query_rpc_endpoint(
        controller.copy_note, "POST", "/notes/{note_id}", securities=[
            header_auth.has(only_author_id=Literal[None], author_id=Literal[None]),
            query_auth.has(Scope.read_any, Scope.create, only_author_id=Literal[None], author_id=get_user_id),
            query_auth.has(Scope.read_own, Scope.create, only_author_id=get_user_id, author_id=get_user_id),
        ],
        exception_type_to_answer_type=exception_to_answer.pick({sqlalchemy.exc.NoResultFound})
    ))

    async with http_server:
        await asyncio.sleep(10 ** 10)

if __name__ == '__main__':
    asyncio.run(main())

Запросим список заметок от имени админа:

curl "http://127.0.0.1:8888/notes?token=admin_token"

Получим 200 OK с телом: {"done": true, "result": []} (заметок нет)
Создадим заметку от первого пользователя:

curl -X POST --location "http://0.0.0.0:8888/notes?token=first_token" \
    -H "Content-Type: application/json" \
    -d '{"content": "111"}'

Получим 200 OK с телом:

{"done": true, "result": {"id": 1, "content": "111", "author_id": 1}}

Запросим список заметок от имени первого пользователя:

curl "http://127.0.0.1:8888/notes?token=first_token"

Получим 200 OK с телом:

{"done": true, "result": [{"id": 1, "content": "111", "author_id": 1}]}

Запросим список заметок от имени второго пользователя:

curl "http://127.0.0.1:8888/notes?token=second_token"

Получим 200 OK с телом:

{"done": true, "result": []}

Создадим заметку от второго пользователя:

curl -X POST --location "http://0.0.0.0:8888/notes?token=second_token" \
    -H "Content-Type: application/json" \
    -d '{"content": "222"}'

Получим 200 OK с телом:

{"done": true, "result": {"id": 2, "content": "222", "author_id": 2}}

Запросим список заметок от имени другого сервера, аутентифицируясь через хедер server_name:

curl "http://127.0.0.1:8888/notes" -H "server_name: other"

Получим 200 OK с телом:

{"done": true, "result": [
  {"id": 1, "content": "111", "author_id": 1}, 
  {"id": 2, "content": "222", "author_id": 2}
]}

Модифицируем заметку 2 от имени админа:

curl -X PUT --location "http://0.0.0.0:8888/notes/2?token=admin_token" \
    -H "Content-Type: application/json" \
    -d '{"content": "qqq"}'

Получим 200 OK с телом:

{"done": true, "result": {"id": 2, "content": "qqq", "author_id": 2}}

Запросим список заметок от имени читателя:

curl "http://127.0.0.1:8888/notes?token=reader_token"

Получим 200 OK с телом:

{"done": true, "result": [
  {"id": 1, "content": "111", "author_id": 1}, 
  {"id": 2, "content": "qqq", "author_id": 2}
]}

Попробуем модифицировать заметку 2 от имени читателя:

curl -v -X PUT --location "http://0.0.0.0:8888/notes/2?token=reader_token" \
    -H "Content-Type: application/json" \
    -d '{"content": "fail"}'

Получим 403 Forbidden с телом, указывающим, что есть два варианта авторизации(серверный скрыт из-за do_log=False), в первом пользователю нехватает скоупа update_any, во втором - update_own

{
  "error": {
    "api_key_in_query_token[update_any]": "Forbidden('missing scopes: update_any')",
    "api_key_in_query_token[update_own]": "Forbidden('missing scopes: update_own')"
  },
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Попробуем модифицировать заметку 2 от имени первого пользователя (у него ведь есть скоуп update_own):

curl -v -X PUT --location "http://0.0.0.0:8888/notes/2?token=first_token" \
    -H "Content-Type: application/json" \
    -d '{"content": "fail"}'

Получим 404 Not Found с телом, указывающим, что такой заявки нет (она невидима первого пользователя, т.к. принадлежит второму пользователю):

{
  "error": "No row was found when one was required",
  "error_type": "NoResultFound",
  "error_code": null,
  "done": false
}

Модифицируем заметку 1 от имени первого пользователя:

curl -v -X PUT --location "http://0.0.0.0:8888/notes/1?token=first_token" \
    -H "Content-Type: application/json" \
    -d '{"content": "success"}'

Получим 404 Not Found с телом, указывающим, что такой заявки нет (она невидима первого пользователя, т.к. принадлежит второму пользователю):

{"done": true, "result": {"id": 1, "content": "success", "author_id": 1}}

Скопируем заметку 1 от имени админа:

curl -X POST --location "http://0.0.0.0:8888/notes/1?token=admin_token"

Получим 200 OK с телом:

{"done": true, "result": {"id": 3, "content": "success", "author_id": 3}}

Удалим первоую заявку от имени админа:

curl -X DELETE --location "http://0.0.0.0:8888/notes/1?token=admin_token"

Получим 200 OK с телом:

{"done": true}

Попробуем скопировать заметку 3 от имени читателя:

curl -v -X POST --location "http://0.0.0.0:8888/notes/3?token=reader_token"

Получим 403 Forbidden с телом, указывающим, что есть два варианта авторизации(серверный скрыт из-за do_log=False), в первом пользователю нехватает скоупа create, во втором - скоупов create и read_own

{
  "error": {
    "api_key_in_query_token[create,read_any]": "Forbidden('missing scopes: create')",
    "api_key_in_query_token[read_own,create]": "Forbidden('missing scopes: create, read_own')"
  },
  "error_type": "ErrorGroup",
  "error_code": null,
  "done": false
}

Импортируем заметку на основе файла от имени сервера:

curl -X POST --location "http://0.0.0.0:8888/notes/import" \
    -H "server_name: other" \
    -H "Content-Type: text/csv" \
    -d 'test,passed'

Получим 200 OK с телом, содержащим id новых заметок:

{"done": true, "result": [4]}

Выгрузим все заметки от имени читателя:

curl -v "http://127.0.0.1:8888/notes/export?token=reader_token"

Получим 200 OK с CSV телом с тремя заметками:

*   Trying 127.0.0.1:8888...
* Connected to 127.0.0.1 (127.0.0.1) port 8888 (#0)
> GET /notes/export?token=reader_token HTTP/1.1
> Host: 127.0.0.1:8888
> User-Agent: curl/7.88.1
> Accept: */*
> 
< HTTP/1.1 200 OK
< Content-Disposition: attachment; filename="notes.csv"
< Content-Type: text/csv; charset=UTF-8
< X-Request-ID: 7f7d6030ddecaf054c8e7fd125df6c2f-127.0.0.1-1728392395.4553113
< Instance: readme
< Content-Length: 51
< Date: Tue, 08 Oct 2024 12:59:55 GMT
< Server: Python/3.11 aiohttp/3.10.5
< 
id,content,author_id
2,qqq,2
3,success,3
4,11,