mentortools/libs/: openapi-tools-abm-6.2.73946 metadata and description
| author | Mike Orlov |
| author_email | m.orlov@technokert.ru |
| classifiers |
|
| description_content_type | text/markdown |
| requires_dist |
|
| requires_python | >=3.11,<4.0 |
| File | Tox results | History |
|---|---|---|
openapi_tools_abm-6.2.73946-py3-none-any.whl
|
|
|
openapi_tools_abm-6.2.73946.tar.gz
|
|
OpenAPI tools
Библиотека позволяет без изменений в логике публиковать её методы в OpenAPI спецификации и HTTP сервере
Копирование кода для тестов
cp -r $ORIGIN_DIR/openapi_tools ./openapi_tools
Примеры использования
Минимальный пример: эндпоинт без аргументов
Допустим, мы хотим опубликовать эндпоинт по получению температуры CPU (захардкодим)
import asyncio
from http_tools import HttpServer, JsonableAnswer, HttpStatusCode
from openapi_tools import OpenApiServer, RpcEndpoint
# функция, которую мы хотим опубликовать
def get_temp(): # сигнатура функции может быть не типизирована
return 42
async def main():
# бойлерплейт
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config('1'),
OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
# Регистрация эндпоинта
api_server.register(RpcEndpoint(get_temp, "GET", "/temperature", [], {}, JsonableAnswer[HttpStatusCode.OK][int]))
# запуск сервера
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Получим варнинг, что в эндпоинте нет проверки доступа:
(Здесь и далее ... - любая подстрока, *** - любое количество любых строк)
...: UserWarning: Endpoints without securities will be prohibited
warnings.warn('Endpoints without securities will be prohibited')
Можно проверить эндпоинт с помощью запроса из консоли:
curl --location "http://0.0.0.0:8888/temperature"
В ответ получим 200 ОК с телом
42
Так же можно получить спецификацию API:
curl --location "http://0.0.0.0:8888/open_api_specification/get"
В ответ получим 200 ОК со спецификацией:
{
"openapi": "3.1.0",
"info": {"title": "ABM:openapi:readme", "version": "1"},
"paths": {
"/temperature": {
"get": {
"summary": "",
"operationId": "get_temp",
"responses": {
"200": {"$ref": "#/components/responses/JsonableAnswer200Int"},
"500": {"$ref": "#/components/responses/ExceptionAnswer500"}
}
}
}
},
"components": {
"schemas": {
"schema_ErrorDescription": {
"type": "object",
"properties": {
"error": {"type": "string"},
"error_type": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"error_code": {"anyOf": [{"type": "integer"}, {"type": "null"}]},
"done": {"type": "boolean"}
}
}
},
"responses": {
"JsonableAnswer200Int": {
"description": "",
"content": {
"application/json": {"schema": {"type": "integer"}}
}
},
"ExceptionAnswer500": {
"description": "",
"content": {
"application/json": {"schema": {"$ref": "#/components/schemas/schema_ErrorDescription"}}
}
}
}
}
}
Рассмотрим подробнее код регистрации:
api_server.register(RpcEndpoint(get_now, "GET", "/now", [], {}, JsonableAnswer[HttpStatusCode.OK][int]))
get_now- обработчик"GET"- HTTP метод"/now"- HTTP путь[]- способы проверки авторизации, пока её пропустим и передадим пустой список - при пустом списке она не выполняется (детально она будет рассмотрена сильно ниже){}- описание получения значений для входных аргументов обработчика, наш обработчик не принимает аргументов, так что пока пропустимJsonableAnswer[HttpStatusCode.OK][int]- Тип HTTP ответа. Он должен- быть унаследован от
http_tools.Answerилиhttp_tools.StreamAnswer(если обработчик является генератором), - иметь определённый статус код (
HttpStatusCode) - иметь определённый тип содержимого (
JsonableAnswerимеетContentType.Json) - иметь определённую структуру (
payload_type).
- быть унаследован от
В данном примере наш ответ имеет статус код 200 OK, тип содержимого application/json и структура - один int
Пример с параметрами
В реальности у эндпоинтов почти всегда есть входные параметры.
Допустим, мы хотим опубликовать метод по извлечению корня из целого числа
import math
import asyncio
from http_tools import HttpServer, WrappedAnswer, HttpStatusCode
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter
def sqrt(value: int) -> float:
return math.sqrt(value)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
sqrt, "GET", "/sqrt/{arg}", [], {
# ожидаем параметр 'arg' типа int в HTTP пути запроса и передаём его в аргумент 'value' обработчика
'value': PathParameter(name='arg', schema=int)
},
# Мы используем подход, что в теле json ответов всегда dict, содержащий метаинформацию об ответе,
# например, поле 'done', а данные лежат под ключом result.
# Всю эту логику реализует WrappedAnswer, дальше будет использовать его
WrappedAnswer[HttpStatusCode.OK][float]
))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Получим варнинг, что в эндпоинте нет проверки доступа:
(Здесь и далее ... - любая подстрока, *** - любое количество любых строк)
...: UserWarning: Endpoints without securities will be prohibited
warnings.warn('Endpoints without securities will be prohibited')
Можно проверить эндпоинт с помощью запроса из консоли:
curl --location "http://0.0.0.0:8888/sqrt/7"
Получим следующий ответ:
{"done": true, "result": 2.6457513110645907}
Если пошлём некорректный тип:
curl --location "http://0.0.0.0:8888/sqrt/bad"
то библиотека попробует скастить его в требуемый тип и мы получим следующий ответ:
{"error": "invalid literal for int() with base 10: 'bad'", "error_type": "ValueError", "error_code": null, "done": false}
Однако, если мы сделаем запрос с отрицательным числом
curl --location "http://0.0.0.0:8888/sqrt/-1"
то получим код 500 и довольно невнятную ошибку:
{"error": "math domain error", "error_type": "ValueError", "error_code": null, "done": false}
Это связано с тем, что мы не обрабатываем исключения, хотя они могут возникать
Пример обработки исключений
Доработаем обработку ошибок, для этого:
- добавим проверку чисел на отрицательность
- при регистрации обработчика добавим ответ для возникающего типа исключения
import math
import asyncio
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter
def sqrt(value: int) -> float:
# Если значение переменной value меньше 0, то будет выброшено исключение AssertionError
assert value >= 0, f'Only non negative values allowed, got: {value}'
return math.sqrt(value)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
sqrt, "GET", "/sqrt/{arg}", [], {'value': PathParameter(name='arg', schema=int)},
WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
AssertionError: ExceptionAnswer[HttpStatusCode.BadRequest]
}))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Проверим ответ на отрицательное число:
curl --location "http://0.0.0.0:8888/sqrt/-1"
Получим код 400 и более понятную ошибку:
{"error": "Only non negative values allowed, got: -1", "error_type": "AssertionError", "error_code": null, "done": false}
Если же требуется автоматизировать обработку различных ошибок на клиенте, то в ответе стоит использовать
http_tools.ErrorCode
Пример использования кодов ошибок
Для демонстрации допустим, что sqrt не может вычислять корень слишком больших значений
Создадим отдельные исключения и для каждого из них укажем ErrorCode
import math
import asyncio
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, ErrorCode, WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter
class TooBigValue(ValueError):
pass
class NegativeValue(ValueError):
pass
def sqrt(value: int) -> float:
if value > 100:
raise TooBigValue(f'Only values lower than 100 allowed, got: {value}')
if value < 0:
raise NegativeValue(f'Only non negative values allowed, got: {value}')
return math.sqrt(value)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
sqrt, "GET", "/sqrt/{arg}", [], {'value': PathParameter(name='arg', schema=int)},
WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
TooBigValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
}))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Проверим ответ на отрицательное число:
curl --location "http://0.0.0.0:8888/sqrt/-1"
Получим код 400 и ошибку с кодом:
{"error": "Only non negative values allowed, got: -1", "error_type": "NegativeValue", "error_code": 2, "done": false}
Проверим ответ на большое число:
curl --location "http://0.0.0.0:8888/sqrt/111"
Получим код 400 и ошибку с кодом:
{"error": "Only values lower than 100 allowed, got: 111", "error_type": "TooBigValue", "error_code": 1, "done": false}
Однако, данный подход(коды ошибок указываются напрямую) может приводить к путанице при увеличении размеров API - в разных эндпоинтах один и тот же код может обозначать разное.
Пример наделения одного кода ошибки разными смыслами
Проиллюстрируем проблему, добавив ещё одну функцию бизнес-логики по возведению числа в степень, и опубликуем её в апи
import math
import asyncio
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, ErrorCode, WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter
class TooBigValue(ValueError):
pass
class NegativeValue(ValueError):
pass
def sqrt(value: int) -> float:
if value > 100:
raise TooBigValue(f'Only values lower than 100 allowed, got: {value}')
if value < 0:
raise NegativeValue(f'Only non negative values allowed, got: {value}')
return math.sqrt(value)
def power(base: float, exponent: float) -> float:
if base < 0 and int(exponent) != exponent:
raise NegativeValue(f'With fractional exponent({exponent}) only non negative bases allowed, got: {base}')
return math.pow(base, exponent)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
sqrt, "GET", "/sqrt/{arg}", [], {'value': PathParameter(name='arg', schema=int)},
WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
TooBigValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
}))
api_server.register(RpcEndpoint(
power, "GET", "/pow/{base}/{exponent}", [], {
'base': PathParameter(name='base', schema=float),
'exponent': PathParameter(name='exponent', schema=float),
},
WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
}))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Проверим эндпоинт /math/pow:
curl "http://0.0.0.0:8888/pow/-1/-2.1"
Получим код 400 и ошибку с кодом 1:
{"error": "With fractional exponent(-2.1) only non negative bases allowed, got: -1.0", "error_type": "NegativeValue", "error_code": 1, "done": false}
При этом получается, что одна и та же проблема в разных эндпоинтах имеет разные коды,
что может путать разработчиков и усложнять код клиентского приложения.
Есть два подхода для составления API, более единообразного в плане кодов ошибок:
- Именованный реестр кодов ошибок
- Единый словарь обработки исключений
Пример использования именованного реестра кодов
Создадим enum со своим перечнем кодов ошибок, тогда при использовании это будут не просто числа, а именованные параметры
import enum
import math
import asyncio
import http_tools
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter
class TooBigValue(ValueError):
pass
class NegativeValue(ValueError):
pass
def sqrt(value: int) -> float:
if value > 100:
raise TooBigValue(f'Only values lower than 100 allowed, got: {value}')
if value < 0:
raise NegativeValue(f'Only non negative values allowed, got: {value}')
return math.sqrt(value)
@enum.unique
class ErrorCode(http_tools.answer.ErrorCode, enum.Enum):
negative_value = 1
too_big_value = 2
def power(base: float, exponent: float) -> float:
if base < 0 and int(exponent) != exponent:
raise NegativeValue(f'With fractional exponent({exponent}) only non negative bases allowed, got: {base}')
return math.pow(base, exponent)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
sqrt, "GET", "/sqrt/{arg}", [], {'value': PathParameter(name='arg', schema=int)},
WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
TooBigValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode.too_big_value],
NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode.negative_value],
}))
api_server.register(RpcEndpoint(
power, "GET", "/pow/{base}/{exponent}", [], {
'base': PathParameter(name='base', schema=float),
'exponent': PathParameter(name='exponent', schema=float),
},
WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode.negative_value],
}))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Такой подход требует минимальных изменений и увеличивает читаемость, но и с ним возможны ошибки - одно и то же исключение в разных эндпоинтах может отображаться на разные коды
Пример единого словаря обработки исключений
Мы можем собрать единый словарь со всеми типами исключений и выбирать из него необходимые для каждого эндпоинта
ВНИМАНИЕ, передавать этот словарь целиком при каждой регистрации обработчика НЕЛЬЗЯ
Это приведёт к формированию спецификации с огромным количеством лишних ответов
import math
import typing
import asyncio
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, ErrorCode, WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter
class TooBigValue(ValueError):
pass
class NegativeValue(ValueError):
pass
def sqrt(value: int) -> float:
if value > 100:
raise TooBigValue(f'Only values lower than 100 allowed, got: {value}')
if value < 0:
raise NegativeValue(f'Only non negative values allowed, got: {value}')
return math.sqrt(value)
def power(base: float, exponent: float) -> float:
if base < 0 and int(exponent) != exponent:
raise NegativeValue(f'With fractional exponent({exponent}) only non negative bases allowed, got: {base}')
return math.pow(base, exponent)
class PickableDict(dict):
def pick(self, keys: typing.Iterable) -> dict:
"""Возвращает новый словарь только с выбранными ключами."""
return {k: self[k] for k in keys if k in self}
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
exception_to_answer = PickableDict({
TooBigValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
})
api_server.register(RpcEndpoint(
sqrt, "GET", "/sqrt/{arg}", [], {'value': PathParameter(name='arg', schema=int)},
WrappedAnswer[HttpStatusCode.OK][float], exception_to_answer.pick({NegativeValue, TooBigValue})))
api_server.register(RpcEndpoint(
power, "GET", "/pow/{base}/{exponent}", [], {
'base': PathParameter(name='base', schema=float),
'exponent': PathParameter(name='exponent', schema=float),
},
WrappedAnswer[HttpStatusCode.OK][float], exception_to_answer.pick({NegativeValue})))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Этот способ на данный момент является рекомендуемым при использовании кодов ошибок в ответах
Пример предобработки аргументов обработчика
В прошлых примерах можно было увидеть, что параметры запроса прямо отображаются в аргументы обработчика,
но иногда требуется предобработка.
Рассмотрим пример, где мы публикуем функцию, считающую количество знаков в строке
Но перед подсчётом мы хотим удалить пробелы в начале и в конце строки
Принимать строку будет из query части запроса
НЕРАБОЧИЙ пример, пояснение ниже
import asyncio
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, QueryParameter, CallTemplate
def get_len(value: str) -> int:
return len(value)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
get_len, "GET", "/len", [], {'value': QueryParameter(name='value', schema=str)},
WrappedAnswer[HttpStatusCode.OK][int]
))
api_server.register(RpcEndpoint(
get_len, "GET", "/len/strip", [], {'value': CallTemplate(str.strip, QueryParameter(name='value', schema=str))},
WrappedAnswer[HttpStatusCode.OK][int]
))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Это НЕРАБОЧИЙ пример: каждый эндпоинт имеет "имя"(operationId) и это имя должно быть уникальным на сервере.
По дефолту в качестве имени используется __name__ обработчика.
Поэтому, если один обработчик требуется зарегистрировать более одного раза, то требуется явно указывать его operation_id.
Перепишем пример:
import asyncio
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, QueryParameter, CallTemplate
def get_len(value: str) -> int:
return len(value)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
get_len, "GET", "/len", [], {'value': QueryParameter(name='value', schema=str)},
WrappedAnswer[HttpStatusCode.OK][int]
))
api_server.register(RpcEndpoint(
get_len, "GET", "/len/strip", [], {'value': CallTemplate(str.strip, QueryParameter(name='value', schema=str))},
WrappedAnswer[HttpStatusCode.OK][int], operation_id='get_len__stripped'
))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Проверим эндпоинт /len (пробел в урле обозначается символом '+'):
curl "http://0.0.0.0:8888/len?value=+test"
Получим {"done": true, "result": 5}
Проверим эндпоинт /len/strip:
curl "http://0.0.0.0:8888/len/strip?value=+test"
Получим {"done": true, "result": 4}
Пример более сложной предобработки аргументов обработчика
Рассмотрим пример, когда нам нужно опубликовать эндпоинт по расчёту расстояния между двумя точками (датаклассами)
Датаклассы можно создать из json строки, переданной в query части запроса, но это преобразование нужно описывать явно
Или можем принимать их из json тела. Реализуем оба варианта
import json
import dataclasses
import asyncio
from init_helpers.dict_to_dataclass import dict_to_dataclass
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, QueryParameter, CallTemplate, JsonBodyParameter
@dataclasses.dataclass
class Point:
x: int
y: int
def get_dist(first: Point, second: Point) -> float:
return ((first.x - second.x) ** 2 + (first.y - second.y) ** 2) ** 0.5
def parse_json_as_point(value: str) -> Point:
return dict_to_dataclass(json.loads(value), Point)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
get_dist, "GET", "/dist", [], {
'first': CallTemplate(parse_json_as_point, QueryParameter(name='first', schema=str)),
'second': CallTemplate(parse_json_as_point, QueryParameter(name='second', schema=str)),
}, WrappedAnswer[HttpStatusCode.OK][float],
))
api_server.register(RpcEndpoint(
get_dist, "GET", "/dist/body", [], {
'first': JsonBodyParameter(name='first', schema=Point),
'second': JsonBodyParameter(name='second', schema=Point),
}, WrappedAnswer[HttpStatusCode.OK][float], operation_id='get_dist_body'
))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Проверим эндпоинт /dist:
curl -X GET --location "http://0.0.0.0:8888/dist?first=%7B%22x%22%3A1%2C%22y%22%3A2%7D&second=%7B%22x%22%3A3%2C%22y%22%3A4%7D"
Проверим эндпоинт /dist/body:
curl -X GET --location "http://0.0.0.0:8888/dist/body" \
-H "Content-Type: application/json" \
-d '{"first":{"x":1,"y":2},"second":{"x":3,"y":4}}'
Ответ одинаковый для обоих запросов: {"done": true, "result": 2.8284271247461903}
Пример автогенерации парамтеров запроса и типа ответа
В прошлых примерах можно заметить, что если не используется предобработка, то параметры, указываемые при регистрации,
повторяют имена и типы аргументов обработчика, а ответ использует возвращаемый тип в качестве схемы
Для сокращения кода есть функции, генерирующие объект RpcEndpoint на основе сигнатуры обработчика
Используем эндпоинты без предобработки из прошлых примеров
(/len для расчёта длины строки и /dist для расстояния между точками)
import dataclasses
import asyncio
from http_tools import HttpServer
from openapi_tools import OpenApiServer, gen_json_rpc_endpoint, gen_query_rpc_endpoint
@dataclasses.dataclass
class Point:
x: int
y: int
def get_dist(first: Point, second: Point) -> float:
return ((first.x - second.x) ** 2 + (first.y - second.y) ** 2) ** 0.5
def get_len(value: str) -> int:
return len(value)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(gen_json_rpc_endpoint(get_dist, "GET", "/dist", []))
api_server.register(gen_query_rpc_endpoint(get_len, "GET", "/len", []))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Проверим эндпоинт /dist:
curl -X GET --location "http://0.0.0.0:8888/dist" \
-H "Content-Type: application/json" \
-d '{"first":{"x":1,"y":2},"second":{"x":3,"y":4}}'
Проверим эндпоинт /len:
curl "http://0.0.0.0:8888/len?value=test"
Аутентификация и авторизация
Базовая аутентификация
Во всех прошлых примерах можно заметить, что при генерации эндпоинта передаётся пустой массив.
Это список объектов, описывающих проверку прав доступа.
Чтобы ограничить доступ к какому-то эндпоинту достаточно при регистрации указать SecurityScheme или Security.
Грубо можно сказать, что SecurityScheme описывает аутентификацию: как проверяется секрет (токен/ключ),
а Security - авторизацию: как проверяются права на вызов эндпоинта.
Рассмотрим простейший пример - аутентификацию по фиксированному секрету
import math
import asyncio
from init_helpers import raise_if
from http_tools import HttpServer
from openapi_tools import OpenApiServer, ApiKeySecurityScheme, gen_query_rpc_endpoint, ParameterLocation, Unauthorized
def sqrt(value: int) -> float:
return math.sqrt(value)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
# Опишем resolver (способ проверки токена). Он должен возвращать информацию о пользователе и список его прав
# Для простоты захардкодим токен, будем возвращать None в качестве информации о пользователе и пустой набор прав
def resolver(token: str) -> tuple[None, list[str]]:
raise_if(token != 'secret', Unauthorized(f'{token=} is invalid'))
return None, []
# Создадим схему безопасности, требующую передачу токена в query части запроса под именем 'name', применим resolver
api_key_scheme = ApiKeySecurityScheme[None](location=ParameterLocation.query, name='token', resolver=resolver)
# Регистрируем эндпоинт с этой схемой
api_server.register(gen_query_rpc_endpoint(sqrt, "get", "/sqrt", [api_key_scheme],))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Делаем запрос без токена
curl "http://0.0.0.0:8888/sqrt?value=4"
Получаем в ответ 401 Unauthorized с телом. В теле описано,
какие варианты авторизации были проверены(api_key_in_query_token[]) и почему не подошли(Unauthorized()):
{
"error": {
"api_key_in_query_token[]": "Unauthorized()"
},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Сделаем запрос с неверным токеном
curl "http://0.0.0.0:8888/sqrt?value=4&token=qqq"
Получаем в ответ 401 Unauthorized. В теле видим исключение из resolver'а(Unauthorized("token='qqq' is invalid")):
{
"error": {
"api_key_in_query_token[]": "Unauthorized(\"token='qqq' is invalid\")"
},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Сделаем запрос с верным токеном
curl "http://0.0.0.0:8888/sqrt?value=4&token=secret"
Получим 200 OK с телом {"done": true, "result": 2.0}
Контроль доступа по скоупам (RBAC)
Часто аутентифицированные пользователи имеют разные набор полномочий и могут вызывать разный набор эндпоинтов.
Для этого в спецификации OpenAPI предусмотрена возможность каждому эндпоинту указать необходимый набор скоупов(прав).
Рассмотрим пример простой очереди задач:
- есть пользователи, которые ставят задачи
- воркеры, которые получают задачи
- админы, которые могут всё + могут переставить задачу из начала в конец очереди
import asyncio
from http_tools import HttpServer
from init_helpers import raise_if
from openapi_tools import OpenApiServer, gen_json_rpc_endpoint
from openapi_tools import HttpBearerSecurityScheme, Unauthorized
tasks_query = []
def add_task(target: int) -> list[int]:
tasks_query.append(target)
return tasks_query
def pop_task() -> int | None:
if tasks_query:
return tasks_query.pop(0)
def reset_task() -> list[int]:
if value := pop_task():
add_task(value)
return tasks_query
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
# Опишем resolver. Он должен возвращать информацию о пользователе и список его прав.
# Будем возвращать None в качестве информации о пользователе, а набор прав брать из словаря
def resolver(token: str) -> tuple[None, list[str]]:
scopes = {'user_token': ['add'], 'worker_token': ['pop'], 'admin_token': ['add', 'pop']}.get(token)
raise_if(scopes is None, Unauthorized(f'{token=} is invalid'))
return None, scopes
# Создадим схему безопасности, на основе HTTP bearer авторизации(передачи токена в хедере Authorisation)
http_auth = HttpBearerSecurityScheme[None](resolver=resolver)
# Регистрируем эндпоинты с этой схемой, указывая требуемые скоупы
api_server.register(gen_json_rpc_endpoint(add_task, "POST", "/task/add", [http_auth.has('add')]))
api_server.register(gen_json_rpc_endpoint(pop_task, "POST", "/task/pop", [http_auth.has('pop')]))
api_server.register(gen_json_rpc_endpoint(reset_task, "post", "/task/reset", [http_auth.has('add', 'pop')]))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Добавляем задачу от пользователя
curl -X POST --location "http://0.0.0.0:8888/task/add" \
-H "Authorization: Bearer user_token" -H "Content-Type: application/json" \
-d '{"target": 42}'
Получим 200 OK: {"done": true, "result": [42]}
Попробуем от имени этого же пользователя извлечь задачу
curl -X POST --location "http://0.0.0.0:8888/task/pop" \
-H "Authorization: Bearer user_token"
Получим 403 с указанием, что проверен один вариант авторизации(http_Bearer требующий права pop). Он сообщает, что
авторизация провалена(Forbidden)(т.е. аутентификация пройдена), т.к. не хватило прав(missing scopes), а именно: pop:
{
"error": {
"http_Bearer[pop]": "Forbidden('missing scopes: pop')"
},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Запросим задачу от имени воркера:
curl -X POST --location "http://0.0.0.0:8888/task/pop" \
-H "Authorization: Bearer worker_token"
Получим 200 OK: {"done": true, "result": 42}
Добавим три задачи от админа
curl -X POST --location "http://0.0.0.0:8888/task/add" \
-H "Authorization: Bearer admin_token" -H "Content-Type: application/json" \
-d '{"target": 1}'
curl -X POST --location "http://0.0.0.0:8888/task/add" \
-H "Authorization: Bearer admin_token" -H "Content-Type: application/json" \
-d '{"target": 2}'
curl -X POST --location "http://0.0.0.0:8888/task/add" \
-H "Authorization: Bearer admin_token" -H "Content-Type: application/json" \
-d '{"target": 3}'
На последний запрос получим 200 OK: {"done": true, "result": [1, 2, 3]}
А теперь от имени админа сбросим первую задачу в конец списка
curl -X POST --location "http://0.0.0.0:8888/task/reset" \
-H "Authorization: Bearer admin_token"
Получим 200 OK: {"done": true, "result": [2, 3, 1]}
Попробуем сделать это от имени пользователя:
curl -X POST --location "http://0.0.0.0:8888/task/reset" \
-H "Authorization: Bearer user_token"
Получим 403 с указанием, что проверен один вариант авторизации(http_Bearer требующий прав add и pop).
Он сообщает, что авторизация провалена(Forbidden)(т.е. аутентификация пройдена),
т.к. не хватило прав(missing scopes), а именно: pop:
{
"error": {
"http_Bearer[add,pop]": "Forbidden('missing scopes: pop')"
},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Попробуем сделать это от имени воркера:
curl -X POST --location "http://0.0.0.0:8888/task/reset" \
-H "Authorization: Bearer worker_token"
Получим 403 с указанием, что проверен один вариант авторизации(http_Bearer требующий прав add и pop).
Он сообщает, что авторизация провалена(Forbidden)(т.е. аутентификация пройдена),
т.к. не хватило прав(missing scopes), а именно: add:
{
"error": {
"http_Bearer[add,pop]": "Forbidden('missing scopes: add')"
},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Контроль доступа по атрибутам пользователей и объектов (ABAC)
Часто в проектах требуется ограничение не только на уровне доступа до эндпоинтов,
но и на уровне ограничения действий или объектов, обрабатываемых эндпоинтами.
Рассмотрим пример по мотивам системы доступа к камерам БР:
- каждая камера привязана к одному региону
- каждый пользователь имеет доступ к одному и более регионам
- каждый пользователь может иметь право на: просмотр текущего видео, просмотри архива, экспорт архива
import asyncio
import datetime
import enum
import itertools
import operator
import time
from dataclasses import dataclass
from typing import Iterable
from http_tools import HttpServer, HttpStatusCode, StreamAnswer, FileAnswer
from http_tools.answer import ExceptionAnswer, ErrorCode
from http_tools.mime_types import ContentType
from init_helpers import raise_if
from openapi_tools import OpenApiServer, HttpBearerSecurityScheme, gen_query_rpc_endpoint, Unauthorized, PickableDict
# опишем объект доступа
@dataclass
class Camera:
id: int
region_id: int
cameras = [Camera(10, 1), Camera(20, 2), Camera(21, 2), Camera(30, 3), Camera(31, 3), Camera(32, 3)]
# опишем исключения
@dataclass
class CameraNotFound(Exception):
camera_id: int
@dataclass
class MissingArchive(Exception):
at: float
# Вспомогательная функция, т.к. всем эндпоинтам нужна камера по id с ограничением по доступным пользователю регионам
def _get_camera(camera_id: int, only_region_ids: Iterable[int] | None) -> Camera | None:
only_cameras = cameras
if only_region_ids is not None:
only_region_ids = frozenset(only_region_ids)
only_cameras = [c for c in cameras if c.region_id in only_region_ids]
return next(iter(filter(lambda cam: cam.id == camera_id, only_cameras)), None)
async def watch_life_video(camera_id: int, only_region_ids: Iterable[int] | None) -> bytes:
raise_if((camera := _get_camera(camera_id, only_region_ids)) is None, CameraNotFound(camera_id))
# Для упрощения примера не будем возвращать реальное видео, вернём текст
for i in itertools.count(int(time.time() / .7)):
yield f'<Camera {camera.id}, life video chunk {int(i)}>\n'.encode()
await asyncio.sleep(.7)
async def watch_archive_video(camera_id: int, at: float, only_region_ids: Iterable[int] | None) -> bytes:
raise_if((camera := _get_camera(camera_id, only_region_ids)) is None, CameraNotFound(camera_id))
raise_if(at >= time.time(), MissingArchive(at))
# Для упрощения примера не будем возвращать реальное видео, вернём текст
for i in itertools.count(int(at / .7)):
yield f'<Camera {camera.id}, archive video chunk {int(i)}>\n'.encode()
await asyncio.sleep(.7)
@dataclass
class File:
content: bytes
name: str
def export_archive_video(
camera_id: int, at: float, duration: float, only_region_ids: Iterable[int] | None
) -> File:
raise_if((camera := _get_camera(camera_id, only_region_ids)) is None, CameraNotFound(camera_id))
raise_if((till := at + duration) >= time.time(), MissingArchive(at + duration))
# Для упрощения примера не будем возвращать реальное видео, вернём заглушку
content = '\n'.join(f'<Camera {camera.id}, archive video chunk {i}>' for i in range(int(at / .7), int(till / .7)))
to_iso = lambda x: datetime.datetime.utcfromtimestamp(x).isoformat().replace(':', '-')
return File(content=content.encode(), name=f'cam_{camera_id}_{to_iso(at)}_{to_iso(till)}.mp4')
# Поскольку скоупы - это строки, то для удобства их стоит оформить в виде реестра строковых енумов
@enum.unique
class Scope(enum.StrEnum):
watch_life = 'watch_life'
watch_archive = 'watch_archive'
export_archive = 'export_archive'
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
exception_to_answer = PickableDict({
CameraNotFound: ExceptionAnswer[HttpStatusCode.NotFound][ErrorCode(1)],
MissingArchive: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
})
def resolver(token: str) -> tuple[None, list[str]]:
user_info = {
'u1': {'scopes': [Scope.watch_life], 'regions': [1]},
'u2': {'scopes': [Scope.watch_life, Scope.watch_archive], 'regions': [2]},
'u3': {'scopes': [Scope.watch_archive, Scope.export_archive], 'regions': [2, 3]},
}.get(token)
raise_if(user_info is None, Unauthorized(f'{token=} is invalid'))
return user_info, user_info['scopes']
regions_getter = operator.itemgetter('regions')
# Создадим схему безопасности, требующую передачу Bearer токена в Authorisation
auth = HttpBearerSecurityScheme[None](resolver=resolver)
api_server.register(gen_query_rpc_endpoint(
watch_life_video, "get", "/camera/{camera_id}/video/life/watch",
# Укажем, что эндпоинт доступен при наличии скоупа watch_life,
# в качестве значения аргумента only_region_ids передадим результат выполнения regions_getter
securities=[auth.has(Scope.watch_life, only_region_ids=regions_getter)],
# Используем StreamAnswer, поскольку эндпоинт содержит yield
answer_type=StreamAnswer[HttpStatusCode.OK][ContentType.Mp4Video],
exception_type_to_answer_type=exception_to_answer.pick({CameraNotFound})
))
api_server.register(gen_query_rpc_endpoint(
watch_archive_video, "get", "/camera/{camera_id}/video/archive/watch",
# Укажем, что эндпоинт доступен при наличии скоупа watch_archive,
# в качестве значения аргумента only_region_ids передадим результат выполнения regions_getter
securities=[auth.has(Scope.watch_archive, only_region_ids=regions_getter)],
# Используем StreamAnswer, поскольку эндпоинт содержит yield
answer_type=StreamAnswer[HttpStatusCode.OK][ContentType.Mp4Video],
exception_type_to_answer_type=exception_to_answer.pick({CameraNotFound, MissingArchive})
))
api_server.register(gen_query_rpc_endpoint(
export_archive_video, "get", "/camera/{camera_id}/video/archive/export",
# Укажем, что эндпоинт доступен при наличии скоупа export_archive,
# в качестве значения аргумента only_region_ids передадим результат выполнения regions_getter
securities=[auth.has(Scope.export_archive, only_region_ids=regions_getter)],
answer_type=FileAnswer[HttpStatusCode.OK][ContentType.Mp4Video],
# Нужна фабрика, т.к. результат работы обработчика нельзя преобразовать в указанный тип ответа простой передачей
answer_factory=lambda file: FileAnswer[HttpStatusCode.OK][ContentType.Mp4Video](file.content, file.name),
exception_type_to_answer_type=exception_to_answer.pick({CameraNotFound, MissingArchive})
))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Запросим живое видео с камеры 10 от имени первого пользователя, с флагом -v, чтобы увидеть детали ответа:
curl -v "http://0.0.0.0:8888/camera/10/video/life/watch" \
-H "Authorization: Bearer u1"
Получим что-то вроде этого:
(Здесь и далее ... - любая подстрока, *** - любое количество любых строк)
***
> GET /camera/10/video/life/watch HTTP/1.1
> Host: 0.0.0.0:8888
> User-Agent: curl/...
> Accept: */*
> Authorization: Bearer u1
***
< HTTP/1.1 200 OK
< Content-Type: video/mp4
< Transfer-Encoding: chunked
< Date: ... GMT
< Server: Python/... aiohttp/...
<
<Camera 10, life video chunk ...>
***
Поток "видео" будет идти, пока мы его не прервём.
Запросим живое видео с этой же камеры 10 от имени второго пользователя, у которого нет доступа к региону 1:
curl "http://0.0.0.0:8888/camera/10/video/life/watch" \
-H "Authorization: Bearer u2"
В ответ получим 404 Not Found с телом:
{
"error": "10",
"error_type": "CameraNotFound",
"error_code": 1,
"done": false
}
Запросим живое видео с этой же камеры 10 от имени третьего пользователя, у которого нет скоупа watch_life:
curl "http://0.0.0.0:8888/camera/10/video/life/watch" \
-H "Authorization: Bearer u3"
В ответ получим 403 Forbidden с телом:
{
"error": {"http_Bearer[watch_life]": "Forbidden('missing scopes: watch_life')"},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Запросим живое видео с этой же камеры 10 от имени не существующего пользователя:
curl "http://0.0.0.0:8888/camera/10/video/life/watch" \
-H "Authorization: Bearer u4"
В ответ получим 401 Unauthorized с телом:
{
"error": {"http_Bearer[watch_life]": "Unauthorized(\"token='u4' is invalid\")"},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Запросим архивное видео с камеры 10 от имени первого пользователя, у которого нет скоупа watch_archive:
curl "http://0.0.0.0:8888/camera/10/video/archive/watch?at=1000000" \
-H "Authorization: Bearer u1"
В ответ получим 403 Forbidden с телом:
{
"error": {"http_Bearer[watch_archive]": "Forbidden('missing scopes: watch_archive')"},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Запросим архивное видео с камеры 20 от имени второго пользователя:
curl -v "http://0.0.0.0:8888/camera/20/video/archive/watch?at=1000000" \
-H "Authorization: Bearer u2"
Получим:
(Здесь и далее ... - любая подстрока, *** - любое количество любых строк)
***
> GET /camera/20/video/archive/watch?at=1000000 HTTP/1.1
> Host: 0.0.0.0:8888
> User-Agent: curl/...
> Accept: */*
> Authorization: Bearer u2
***
< HTTP/1.1 200 OK
< Content-Type: video/mp4
< Transfer-Encoding: chunked
< Date: ...
< Server: Python/... aiohttp/...
<
<Camera 20, archive video chunk ...>
***
Поток "видео" будет идти, пока мы его не прервём.
Запросим экспорт архива с камеры 20 от имени третьего пользователя:
curl -v "http://0.0.0.0:8888/camera/20/video/archive/export?at=1000000&duration=3" \
-H "Authorization: Bearer u3"
Получим в ответ файл cam_20_1970-01-12T13-46-40_1970-01-12T13-47-00.mp4:
(Здесь и далее ... - любая подстрока, *** - любое количество любых строк)
***
> GET /camera/20/video/archive/export?at=1000000&duration=3 HTTP/1.1
> Host: 0.0.0.0:8888
> User-Agent: curl/...
> Accept: */*
> Authorization: Bearer u3
***
< HTTP/1.1 200 OK
< Content-Disposition: attachment; filename="cam_20_1970-01-12T13-46-40_1970-01-12T13-46-43.mp4"
< Content-Type: video/mp4
< X-Request-ID: ...
< Instance: readme
< Content-Length: ...
< Date: ...
< Server: Python/... aiohttp/...
<
<Camera 20, archive video chunk ...>
<Camera 20, archive video chunk ...>
***