mentortools/libs/: openapi-tools-abm-5.1.65674a0 metadata and description
| author | Mike Orlov |
| author_email | m.orlov@technokert.ru |
| classifiers |
|
| description_content_type | text/markdown |
| requires_dist |
|
| requires_python | >=3.11,<4.0 |
| File | Tox results | History |
|---|---|---|
openapi_tools_abm-5.1.65674a0-py3-none-any.whl
|
|
|
openapi_tools_abm-5.1.65674a0.tar.gz
|
|
OpenAPI tools
Тест раннера
print(3+2)
Ожидаем
5
Библиотека позволяет без изменений в логике публиковать её методы в OpenAPI спецификации и HTTP сервере
Примеры использования
Минимальный пример: эндпоинт без аргументов
Допустим, мы хотим опубликовать эндпоинт по получению температуры CPU (захардкодим)
print('before imports, import time, asyncio')
import time
import asyncio
print(f'imported asyncio, import http_tools, {(now := time.time()):.3f}')
from http_tools import HttpServer, JsonableAnswer, HttpStatusCode
print(f'imported http_tools, import openapi_tools, {-now + (now := time.time()):.3f}')
from openapi_tools import OpenApiServer, RpcEndpoint
print(f'imported openapi_tools, run main, {-now + (now := time.time()):.3f}')
# функция, которую мы хотим опубликовать
def get_temp(): # сигнатура функции может быть не типизирована
return 42
async def main():
# бойлерплейт
print('start main')
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config('1'),
OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
print('register endpoint')
# Регистрация эндпоинта
api_server.register(RpcEndpoint(get_temp, "GET", "/temperature", [], {}, JsonableAnswer[HttpStatusCode.OK][int]))
print('start server')
# запуск сервера
async with http_server:
print('server started, sleep')
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Получим варнинг, что в эндпоинте нет проверки доступа:
before imports, import time, asyncio
imported asyncio, import http_tools, 17...
imported http_tools, import openapi_tools, ...
imported openapi_tools, run main,
start main
register endpoint
...: UserWarning: Endpoints without securities will be prohibited
warnings.warn('Endpoints without securities will be prohibited')
start server
server started, sleep
Проверим, что порт открыт:
ss -tpln | grep 8888
Ожидаем что-то такое:
... 0.0.0.0:8888 ... users:(("python",...
Можно проверить эндпоинт с помощью запроса из консоли:
curl --location "http://0.0.0.0:8888/temperature"
В ответ получим 200 ОК с телом
42
Так же можно получить спецификацию API:
curl --location "http://0.0.0.0:8888/open_api_specification/get"
В ответ получим 200 ОК со спецификацией:
{
"openapi": "3.1.0",
"info": {"title": "ABM:openapi:readme", "version": "1"},
"paths": {
"/temperature": {
"get": {
"summary": "",
"operationId": "get_temp",
"security": [],
"parameters": [],
"responses": {
"200": {"$ref": "#/components/responses/JsonableAnswer200Int"},
"500": {"$ref": "#/components/responses/ExceptionAnswer500"}
}
}
}
},
"components": {
"schemas": {
"schema_ErrorDescription": {
"type": "object",
"properties": {
"error": {"type": "string"},
"error_type": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"error_code": {"anyOf": [{"type": "integer"}, {"type": "null"}]},
"done": {"type": "boolean"}
}
}
},
"responses": {
"JsonableAnswer200Int": {
"description": "",
"content": {
"application/json": {"schema": {"type": "integer"}}
}
},
"ExceptionAnswer500": {
"description": "",
"content": {
"application/json": {"schema": {"$ref": "#/components/schemas/schema_ErrorDescription"}}
}
}
}
}
}
Рассмотрим подробнее код регистрации:
api_server.register(RpcEndpoint(get_now, "GET", "/now", [], {}, JsonableAnswer[HttpStatusCode.OK][int]))
get_now- обработчик"GET"- HTTP метод"/now"- HTTP путь[]- способы проверки авторизации, пока её пропустим и передадим пустой список - при пустом списке она не выполняется (детально она будет рассмотрена сильно ниже){}- описание получения значений для входных аргументов обработчика, наш обработчик не принимает аргументов, так что пока пропустимJsonableAnswer[HttpStatusCode.OK][int]- Тип HTTP ответа. Он должен- быть унаследован от
http_tools.Answerилиhttp_tools.StreamAnswer(если обработчик является генератором), - иметь определённый статус код (
HttpStatusCode) - иметь определённый тип содержимого (
JsonableAnswerимеетContentType.Json) - иметь определённую структуру (
payload_type).
- быть унаследован от
В данном примере наш ответ имеет статус код 200 OK, тип содержимого application/json и структура - один int
Пример с параметрами
В реальности у эндпоинтов почти всегда есть входные параметры.
Допустим, мы хотим опубликовать метод по извлечению корня из целого числа
import math
import asyncio
from http_tools import HttpServer, WrappedAnswer, HttpStatusCode
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter
def sqrt(value: int) -> float:
return math.sqrt(value)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
sqrt, "GET", "/sqrt/{arg}", [], {
# ожидаем параметр 'arg' типа int в HTTP пути запроса и передаём его в аргумент 'value' обработчика
'value': PathParameter(name='arg', schema=int)
},
# Мы используем подход, что в теле json ответов всегда dict, содержащий метаинформацию об ответе,
# например, поле 'done', а данные лежат под ключом result.
# Всю эту логику реализует WrappedAnswer, дальше будет использовать его
WrappedAnswer[HttpStatusCode.OK][float]
))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Можно проверить эндпоинт с помощью запроса из консоли:
curl --location "http://0.0.0.0:8888/sqrt/7"
Получим следующий ответ:
{"done": true, "result": 2.6457513110645907}
Если пошлём некорректный тип:
curl --location "http://0.0.0.0:8888/sqrt/bad"
то библиотека попробует скастить его в требуемый тип и мы получим следующий ответ:
{"error": "invalid literal for int() with base 10: 'bad'", "error_type": "ValueError", "error_code": null, "done": false}
Однако, если мы сделаем запрос с отрицательным числом
curl --location "http://0.0.0.0:8888/sqrt/-1"
то получим код 500 и довольно невнятную ошибку:
{"error": "math domain error", "error_type": "ValueError", "error_code": null, "done": false}
Это связано с тем, что мы не обрабатываем исключения, хотя они могут возникать
Пример обработки исключений
Доработаем обработку ошибок, для этого:
- добавим проверку чисел на отрицательность
- при регистрации обработчика добавим ответ для возникающего типа исключения
import math
import asyncio
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter
def sqrt(value: int) -> float:
# Если значение переменной value меньше 0, то будет выброшено исключение AssertionError
assert value >= 0, f'Only non negative values allowed, got: {value}'
return math.sqrt(value)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
sqrt, "GET", "/sqrt/{arg}", [], {'value': PathParameter(name='arg', schema=int)},
WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
AssertionError: ExceptionAnswer[HttpStatusCode.BadRequest]
}))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Проверим ответ на отрицательное число:
curl --location "http://0.0.0.0:8888/sqrt/-1"
Получим код 400 и более понятную ошибку:
{"error": "Only non negative values allowed, got: -1", "error_type": "AssertionError", "error_code": null, "done": false}
Если же требуется автоматизировать обработку различных ошибок на клиенте, то в ответе стоит использовать
http_tools.ErrorCode
Пример использования кодов ошибок
Для демонстрации допустим, что sqrt не может вычислять корень слишком больших значений
Создадим отдельные исключения и для каждого из них укажем ErrorCode
import math
import asyncio
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, ErrorCode, WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter
class TooBigValue(ValueError):
pass
class NegativeValue(ValueError):
pass
def sqrt(value: int) -> float:
if value > 100:
raise TooBigValue(f'Only values lower than 100 allowed, got: {value}')
if value < 0:
raise NegativeValue(f'Only non negative values allowed, got: {value}')
return math.sqrt(value)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
sqrt, "GET", "/sqrt/{arg}", [], {'value': PathParameter(name='arg', schema=int)},
WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
TooBigValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
}))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Проверим ответ на отрицательное число:
curl --location "http://0.0.0.0:8888/sqrt/-1"
Получим код 400 и ошибку с кодом:
{"error": "Only non negative values allowed, got: -1", "error_type": "NegativeValue", "error_code": 2, "done": false}
Проверим ответ на большое число:
curl --location "http://0.0.0.0:8888/sqrt/111"
Получим код 400 и ошибку с кодом:
{"error": "Only values lower than 100 allowed, got: 111", "error_type": "TooBigValue", "error_code": 1, "done": false}
Однако, данный подход(коды ошибок указываются напрямую) может приводить к путанице при увеличении размеров API - в разных эндпоинтах один и тот же код может обозначать разное.
Пример наделения одного кода ошибки разными смыслами
Проиллюстрируем проблему, добавив ещё одну функцию бизнес-логики по возведению числа в степень, и опубликуем её в апи
import math
import asyncio
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, ErrorCode, WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter
class TooBigValue(ValueError):
pass
class NegativeValue(ValueError):
pass
def sqrt(value: int) -> float:
if value > 100:
raise TooBigValue(f'Only values lower than 100 allowed, got: {value}')
if value < 0:
raise NegativeValue(f'Only non negative values allowed, got: {value}')
return math.sqrt(value)
def power(base: float, exponent: float) -> float:
if base < 0 and int(exponent) != exponent:
raise NegativeValue(f'With fractional exponent({exponent}) only non negative bases allowed, got: {base}')
return math.pow(base, exponent)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
sqrt, "GET", "/sqrt/{arg}", [], {'value': PathParameter(name='arg', schema=int)},
WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
TooBigValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
}))
api_server.register(RpcEndpoint(
power, "GET", "/pow/{base}/{exponent}", [], {
'base': PathParameter(name='base', schema=float),
'exponent': PathParameter(name='exponent', schema=float),
},
WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
}))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Проверим эндпоинт /math/pow:
curl "http://0.0.0.0:8888/pow/-1/-2.1"
Получим код 400 и ошибку с кодом 1:
{"error": "With fractional exponent(-2.1) only non negative bases allowed, got: -1.0", "error_type": "NegativeValue", "error_code": 1, "done": false}
При этом получается, что одна и та же проблема в разных эндпоинтах имеет разные коды,
что может путать разработчиков и усложнять код клиентского приложения.
Есть два подхода для составления API, более единообразного в плане кодов ошибок:
- Именованный реестр кодов ошибок
- Единый словарь обработки исключений
Пример использования именованного реестра кодов
Создадим enum со своим перечнем кодов ошибок, тогда при использовании это будут не просто числа, а именованные параметры
import enum
import math
import asyncio
import http_tools
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter
class TooBigValue(ValueError):
pass
class NegativeValue(ValueError):
pass
def sqrt(value: int) -> float:
if value > 100:
raise TooBigValue(f'Only values lower than 100 allowed, got: {value}')
if value < 0:
raise NegativeValue(f'Only non negative values allowed, got: {value}')
return math.sqrt(value)
@enum.unique
class ErrorCode(http_tools.answer.ErrorCode, enum.Enum):
negative_value = 1
too_big_value = 2
def power(base: float, exponent: float) -> float:
if base < 0 and int(exponent) != exponent:
raise NegativeValue(f'With fractional exponent({exponent}) only non negative bases allowed, got: {base}')
return math.pow(base, exponent)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
sqrt, "GET", "/sqrt/{arg}", [], {'value': PathParameter(name='arg', schema=int)},
WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
TooBigValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode.too_big_value],
NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode.negative_value],
}))
api_server.register(RpcEndpoint(
power, "GET", "/pow/{base}/{exponent}", [], {
'base': PathParameter(name='base', schema=float),
'exponent': PathParameter(name='exponent', schema=float),
},
WrappedAnswer[HttpStatusCode.OK][float], exception_type_to_answer_type={
NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode.negative_value],
}))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Такой подход требует минимальных изменений и увеличивает читаемость, но и с ним возможны ошибки - одно и то же исключение в разных эндпоинтах может отображаться на разные коды
Пример единого словаря обработки исключений
Мы можем собрать единый словарь со всеми типами исключений и выбирать из него необходимые для каждого эндпоинта
ВНИМАНИЕ, передавать этот словарь целиком при каждой регистрации обработчика НЕЛЬЗЯ
Это приведёт к формированию спецификации с огромным количеством лишних ответов
import math
import typing
import asyncio
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, ErrorCode, WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, PathParameter
class TooBigValue(ValueError):
pass
class NegativeValue(ValueError):
pass
def sqrt(value: int) -> float:
if value > 100:
raise TooBigValue(f'Only values lower than 100 allowed, got: {value}')
if value < 0:
raise NegativeValue(f'Only non negative values allowed, got: {value}')
return math.sqrt(value)
def power(base: float, exponent: float) -> float:
if base < 0 and int(exponent) != exponent:
raise NegativeValue(f'With fractional exponent({exponent}) only non negative bases allowed, got: {base}')
return math.pow(base, exponent)
class PickableDict(dict):
def pick(self, keys: typing.Iterable) -> dict:
"""Возвращает новый словарь только с выбранными ключами."""
return {k: self[k] for k in keys if k in self}
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
exception_to_answer = PickableDict({
TooBigValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(1)],
NegativeValue: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
})
api_server.register(RpcEndpoint(
sqrt, "GET", "/sqrt/{arg}", [], {'value': PathParameter(name='arg', schema=int)},
WrappedAnswer[HttpStatusCode.OK][float], exception_to_answer.pick({NegativeValue, TooBigValue})))
api_server.register(RpcEndpoint(
power, "GET", "/pow/{base}/{exponent}", [], {
'base': PathParameter(name='base', schema=float),
'exponent': PathParameter(name='exponent', schema=float),
},
WrappedAnswer[HttpStatusCode.OK][float], exception_to_answer.pick({NegativeValue})))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Этот способ на данный момент является рекомендуемым при использовании кодов ошибок в ответах
Пример предобработки аргументов обработчика
В прошлых примерах можно было увидеть, что параметры запроса прямо отображаются в аргументы обработчика,
но иногда требуется предобработка.
Рассмотрим пример, где мы публикуем функцию, считающую количество знаков в строке
Но перед подсчётом мы хотим удалить пробелы в начале и в конце строки
Принимать строку будет из query части запроса
НЕРАБОЧИЙ пример, пояснение ниже
import asyncio
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, QueryParameter, CallTemplate
def get_len(value: str) -> int:
return len(value)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
get_len, "GET", "/len", [], {'value': QueryParameter(name='value', schema=str)},
WrappedAnswer[HttpStatusCode.OK][int]
))
api_server.register(RpcEndpoint(
get_len, "GET", "/len/strip", [], {'value': CallTemplate(str.strip, QueryParameter(name='value', schema=str))},
WrappedAnswer[HttpStatusCode.OK][int]
))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Это НЕРАБОЧИЙ пример: каждый эндпоинт имеет "имя"(operationId) и это имя должно быть уникальным на сервере.
По дефолту в качестве имени используется __name__ обработчика.
Поэтому, если один обработчик требуется зарегистрировать более одного раза, то требуется явно указывать его operation_id.
Перепишем пример:
import asyncio
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, QueryParameter, CallTemplate
def get_len(value: str) -> int:
return len(value)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
get_len, "GET", "/len", [], {'value': QueryParameter(name='value', schema=str)},
WrappedAnswer[HttpStatusCode.OK][int]
))
api_server.register(RpcEndpoint(
get_len, "GET", "/len/strip", [], {'value': CallTemplate(str.strip, QueryParameter(name='value', schema=str))},
WrappedAnswer[HttpStatusCode.OK][int], operation_id='get_len__stripped'
))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Проверим эндпоинт /len (пробел в урле обозначается символом '+'):
curl "http://0.0.0.0:8888/len?value=+test"
Получим {"done": true, "result": 5}
Проверим эндпоинт /len/strip:
curl "http://0.0.0.0:8888/len/strip?value=+test"
Получим {"done": true, "result": 4}
Пример более сложной предобработки аргументов обработчика
Рассмотрим пример, когда нам нужно опубликовать эндпоинт по расчёту расстояния между двумя точками (датаклассами)
Датаклассы можно создать из json строки, переданной в query части запроса, но это преобразование нужно описывать явно
Или можем принимать их из json тела. Реализуем оба варианта
import json
import dataclasses
import asyncio
from init_helpers.dict_to_dataclass import dict_to_dataclass
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import WrappedAnswer
from openapi_tools import OpenApiServer, RpcEndpoint, QueryParameter, CallTemplate, JsonBodyParameter
@dataclasses.dataclass
class Point:
x: int
y: int
def get_dist(first: Point, second: Point) -> float:
return ((first.x - second.x) ** 2 + (first.y - second.y) ** 2) ** 0.5
def parse_json_as_point(value: str) -> Point:
return dict_to_dataclass(json.loads(value), Point)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(RpcEndpoint(
get_dist, "GET", "/dist", [], {
'first': CallTemplate(parse_json_as_point, QueryParameter(name='first', schema=str)),
'second': CallTemplate(parse_json_as_point, QueryParameter(name='second', schema=str)),
}, WrappedAnswer[HttpStatusCode.OK][float],
))
api_server.register(RpcEndpoint(
get_dist, "GET", "/dist/body", [], {
'first': JsonBodyParameter(name='first', schema=Point),
'second': JsonBodyParameter(name='second', schema=Point),
}, WrappedAnswer[HttpStatusCode.OK][float], operation_id='get_dist_body'
))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Проверим эндпоинт /dist:
curl -X GET --location "http://0.0.0.0:8888/dist?first=%7B%22x%22%3A1%2C%22y%22%3A2%7D&second=%7B%22x%22%3A3%2C%22y%22%3A4%7D"
Проверим эндпоинт /dist/body:
curl -X GET --location "http://0.0.0.0:8888/dist/body" \
-H "Content-Type: application/json" \
-d '{"first":{"x":1,"y":2},"second":{"x":3,"y":4}}'
Ответ одинаковый для обоих запросов: {"done": true, "result": 2.8284271247461903}
Пример автогенерации парамтеров запроса и типа ответа
В прошлых примерах можно заметить, что если не используется предобработка, то параметры, указываемые при регистрации,
повторяют имена и типы аргументов обработчика, а ответ использует возвращаемый тип в качестве схемы
Для сокращения кода есть функции, генерирующие объект RpcEndpoint на основе сигнатуры обработчика
Используем эндпоинты без предобработки из прошлых примеров
(/len для расчёта длины строки и /dist для расстояния между точками)
import dataclasses
import asyncio
from http_tools import HttpServer
from openapi_tools import OpenApiServer, gen_json_rpc_endpoint, gen_query_rpc_endpoint
@dataclasses.dataclass
class Point:
x: int
y: int
def get_dist(first: Point, second: Point) -> float:
return ((first.x - second.x) ** 2 + (first.y - second.y) ** 2) ** 0.5
def get_len(value: str) -> int:
return len(value)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
api_server.register(gen_json_rpc_endpoint(get_dist, "GET", "/dist", []))
api_server.register(gen_query_rpc_endpoint(get_len, "GET", "/len", []))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Проверим эндпоинт /dist:
curl -X GET --location "http://0.0.0.0:8888/dist" \
-H "Content-Type: application/json" \
-d '{"first":{"x":1,"y":2},"second":{"x":3,"y":4}}'
Проверим эндпоинт /len:
curl "http://0.0.0.0:8888/len?value=test"
Аутентификация и авторизация
Базовая аутентификация
Во всех прошлых примерах можно заметить, что при генерации эндпоинта передаётся пустой массив.
Это список объектов, описывающих проверку прав доступа.
Чтобы ограничить доступ к какому-то эндпоинту достаточно при регистрации указать SecurityScheme или Security.
Грубо можно сказать, что SecurityScheme описывает аутентификацию: как проверяется секрет (токен/ключ),
а Security - авторизацию: как проверяются права на вызов эндпоинта.
Рассмотрим простейший пример - аутентификацию по фиксированному секрету
import math
import asyncio
from init_helpers import raise_if
from http_tools import HttpServer
from openapi_tools import OpenApiServer, ApiKeySecurityScheme, gen_query_rpc_endpoint, ParameterLocation, Unauthorized
def sqrt(value: int) -> float:
return math.sqrt(value)
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
# Опишем resolver (способ проверки токена). Он должен возвращать информацию о пользователе и список его прав
# Для простоты захардкодим токен, будем возвращать None в качестве информации о пользователе и пустой набор прав
def resolver(token: str) -> tuple[None, list[str]]:
raise_if(token != 'secret', Unauthorized(f'{token=} is invalid'))
return None, []
# Создадим схему безопасности, требующую передачу токена в query части запроса под именем 'name', применим resolver
api_key_scheme = ApiKeySecurityScheme[None](location=ParameterLocation.query, name='token', resolver=resolver)
# Регистрируем эндпоинт с этой схемой
api_server.register(gen_query_rpc_endpoint(sqrt, "get", "/sqrt", [api_key_scheme],))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Делаем запрос без токена
curl "http://0.0.0.0:8888/sqrt?value=4"
Получаем в ответ 401 Unauthorized с телом. В теле описано,
какие варианты авторизации были проверены(api_key_in_query_token[]) и почему не подошли(Unauthorized()):
{
"error": {
"api_key_in_query_token[]": "Unauthorized()"
},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Сделаем запрос с неверным токеном
curl "http://0.0.0.0:8888/sqrt?value=4&token=qqq"
Получаем в ответ 401 Unauthorized. В теле видим исключение из resolver'а(Unauthorized("token='qqq' is invalid")):
{
"error": {
"api_key_in_query_token[]": "Unauthorized(\"token='qqq' is invalid\")"
},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Сделаем запрос с верным токеном
curl "http://0.0.0.0:8888/sqrt?value=4&token=secret"
Получим 200 OK с телом {"done": true, "result": 2.0}
Контроль доступа по скоупам (RBAC)
Часто аутентифицированные пользователи имеют разные набор полномочий и могут вызывать разный набор эндпоинтов.
Для этого в спецификации OpenAPI предусмотрена возможность каждому эндпоинту указать необходимый набор скоупов(прав).
Рассмотрим пример простой очереди задач:
- есть пользователи, которые ставят задачи
- воркеры, которые получают задачи
- админы, которые могут всё + могут переставить задачу из начала в конец очереди
import asyncio
from http_tools import HttpServer
from init_helpers import raise_if
from openapi_tools import OpenApiServer, gen_json_rpc_endpoint
from openapi_tools import HttpBearerSecurityScheme, Unauthorized
tasks_query = []
def add_task(target: int) -> list[int]:
tasks_query.append(target)
return tasks_query
def pop_task() -> int | None:
if tasks_query:
return tasks_query.pop(0)
def reset_task() -> list[int]:
if value := pop_task():
add_task(value)
return tasks_query
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
# Опишем resolver. Он должен возвращать информацию о пользователе и список его прав.
# Будем возвращать None в качестве информации о пользователе, а набор прав брать из словаря
def resolver(token: str) -> tuple[None, list[str]]:
scopes = {'user_token': ['add'], 'worker_token': ['pop'], 'admin_token': ['add', 'pop']}.get(token)
raise_if(scopes is None, Unauthorized(f'{token=} is invalid'))
return None, scopes
# Создадим схему безопасности, на основе HTTP bearer авторизации(передачи токена в хедере Authorisation)
http_auth = HttpBearerSecurityScheme[None](resolver=resolver)
# Регистрируем эндпоинты с этой схемой, указывая требуемые скоупы
api_server.register(gen_json_rpc_endpoint(add_task, "POST", "/task/add", [http_auth.has('add')]))
api_server.register(gen_json_rpc_endpoint(pop_task, "POST", "/task/pop", [http_auth.has('pop')]))
api_server.register(gen_json_rpc_endpoint(reset_task, "post", "/task/reset", [http_auth.has('add', 'pop')]))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Добавляем задачу от пользователя
curl -X POST --location "http://0.0.0.0:8888/task/add" \
-H "Authorization: Bearer user_token" -H "Content-Type: application/json" \
-d '{"target": 42}'
Получим 200 OK: {"done": true, "result": [42]}
Попробуем от имени этого же пользователя извлечь задачу
curl -X POST --location "http://0.0.0.0:8888/task/pop" \
-H "Authorization: Bearer user_token"
Получим 403 с указанием, что проверен один вариант авторизации(http_Bearer требующий права pop). Он сообщает, что
авторизация провалена(Forbidden)(т.е. аутентификация пройдена), т.к. не хватило прав(missing scopes), а именно: pop:
{
"error": {
"http_Bearer[pop]": "Forbidden('missing scopes: pop')"
},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Запросим задачу от имени воркера:
curl -X POST --location "http://0.0.0.0:8888/task/pop" \
-H "Authorization: Bearer worker_token"
Получим 200 OK: {"done": true, "result": 42}
Добавим три задачи от админа
curl -X POST --location "http://0.0.0.0:8888/task/add" \
-H "Authorization: Bearer admin_token" -H "Content-Type: application/json" \
-d '{"target": 1}'
curl -X POST --location "http://0.0.0.0:8888/task/add" \
-H "Authorization: Bearer admin_token" -H "Content-Type: application/json" \
-d '{"target": 2}'
curl -X POST --location "http://0.0.0.0:8888/task/add" \
-H "Authorization: Bearer admin_token" -H "Content-Type: application/json" \
-d '{"target": 3}'
На последний запрос получим 200 OK: {"done": true, "result": [1, 2, 3]}
А теперь от имени админа сбросим первую задачу в конец списка
curl -X POST --location "http://0.0.0.0:8888/task/reset" \
-H "Authorization: Bearer admin_token"
Получим 200 OK: {"done": true, "result": [2, 3, 1]}
Попробуем сделать это от имени пользователя:
curl -X POST --location "http://0.0.0.0:8888/task/reset" \
-H "Authorization: Bearer user_token"
Получим 403 с указанием, что проверен один вариант авторизации(http_Bearer требующий прав add и pop).
Он сообщает, что авторизация провалена(Forbidden)(т.е. аутентификация пройдена),
т.к. не хватило прав(missing scopes), а именно: pop:
{
"error": {
"http_Bearer[add,pop]": "Forbidden('missing scopes: pop')"
},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Попробуем сделать это от имени воркера:
curl -X POST --location "http://0.0.0.0:8888/task/reset" \
-H "Authorization: Bearer worker_token"
Получим 403 с указанием, что проверен один вариант авторизации(http_Bearer требующий прав add и pop).
Он сообщает, что авторизация провалена(Forbidden)(т.е. аутентификация пройдена),
т.к. не хватило прав(missing scopes), а именно: add:
{
"error": {
"http_Bearer[add,pop]": "Forbidden('missing scopes: add')"
},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Контроль доступа по атрибутам пользователей и объектов (ABAC)
Часто в проектах требуется ограничение не только на уровне доступа до эндпоинтов,
но и на уровне ограничения действий или объектов, обрабатываемых эндпоинтами
Рассмотрим пример по мотивам системы доступа к камерам БР:
- каждая камера привязана к одному региону
- каждый пользователь имеет доступ к одному и более регионам
- каждый пользователь может иметь право на: просмотр текущего видео, просмотри архива, экспорт архива
import asyncio
import datetime
import enum
import itertools
import operator
import time
from dataclasses import dataclass
from typing import Iterable
from http_tools import HttpServer, HttpStatusCode, StreamAnswer, FileAnswer
from http_tools.answer import ExceptionAnswer, ErrorCode
from http_tools.mime_types import ContentType
from init_helpers import raise_if
from openapi_tools import OpenApiServer, HttpBearerSecurityScheme, gen_query_rpc_endpoint, Unauthorized, PickableDict
# опишем объект доступа
@dataclass
class Camera:
id: int
region_id: int
cameras = [Camera(10, 1), Camera(20, 2), Camera(21, 2), Camera(30, 3), Camera(31, 3), Camera(32, 3)]
# опишем исключения
@dataclass
class CameraNotFound(Exception):
camera_id: int
@dataclass
class MissingArchive(Exception):
at: float
# Вспомогательная функция, т.к. всем эндпоинтам нужна камера по id с ограничением по доступным пользователю регионам
def _get_camera(camera_id: int, only_region_ids: Iterable[int] | None) -> Camera | None:
only_cameras = cameras
if only_region_ids is not None:
only_region_ids = frozenset(only_region_ids)
only_cameras = [c for c in cameras if c.region_id in only_region_ids]
return next(iter(filter(lambda cam: cam.id == camera_id, only_cameras)), None)
async def watch_life_video(camera_id: int, only_region_ids: Iterable[int] | None) -> bytes:
raise_if((camera := _get_camera(camera_id, only_region_ids)) is None, CameraNotFound(camera_id))
# Для упрощения примера не будем возвращать реальное видео, вернём текст
for i in itertools.count(int(time.time() / .3)):
yield f'<Camera {camera.id}, life video chunk {int(i)}>\n'.encode()
await asyncio.sleep(.3)
async def watch_archive_video(camera_id: int, at: float, only_region_ids: Iterable[int] | None) -> bytes:
raise_if((camera := _get_camera(camera_id, only_region_ids)) is None, CameraNotFound(camera_id))
raise_if(at >= time.time(), MissingArchive(at))
# Для упрощения примера не будем возвращать реальное видео, вернём текст
for i in itertools.count(int(at / .3)):
yield f'<Camera {camera.id}, archive video chunk {int(i)}>\n'.encode()
await asyncio.sleep(.3)
@dataclass
class File:
content: bytes
name: str
def export_archive_video(
camera_id: int, at: float, duration: float, only_region_ids: Iterable[int] | None
) -> File:
raise_if((camera := _get_camera(camera_id, only_region_ids)) is None, CameraNotFound(camera_id))
raise_if((till := at + duration) >= time.time(), MissingArchive(at + duration))
# Для упрощения примера не будем возвращать реальное видео, вернём заглушку
content = '\n'.join(f'<Camera {camera.id}, archive video chunk {i}>' for i in range(int(at / .3), int(till / .3)))
to_iso = lambda x: datetime.datetime.fromtimestamp(x).isoformat().replace(':', '-')
return File(content=content.encode(), name=f'cam_{camera_id}_{to_iso(at)}_{to_iso(till)}.mp4')
# Поскольку скоупы - это строки, то для удобства их стоит оформить в виде реестра строковых енумов
@enum.unique
class Scope(enum.StrEnum):
watch_life = 'watch_life'
watch_archive = 'watch_archive'
export_archive = 'export_archive'
async def main():
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
exception_to_answer = PickableDict({
CameraNotFound: ExceptionAnswer[HttpStatusCode.NotFound][ErrorCode(1)],
MissingArchive: ExceptionAnswer[HttpStatusCode.BadRequest][ErrorCode(2)],
})
def resolver(token: str) -> tuple[None, list[str]]:
user_info = {
'u1': {'scopes': [Scope.watch_life], 'regions': [1]},
'u2': {'scopes': [Scope.watch_life, Scope.watch_archive], 'regions': [2]},
'u3': {'scopes': [Scope.watch_archive, Scope.export_archive], 'regions': [2, 3]},
}.get(token)
raise_if(user_info is None, Unauthorized(f'{token=} is invalid'))
return user_info, user_info['scopes']
regions_getter = operator.itemgetter('regions')
# Создадим схему безопасности, требующую передачу Bearer токена в Authorisation
auth = HttpBearerSecurityScheme[None](resolver=resolver)
api_server.register(gen_query_rpc_endpoint(
watch_life_video, "get", "/camera/{camera_id}/video/life/watch",
# Укажем, что эндпоинт доступен при наличии скоупа watch_life,
# в качестве значения аргумента only_region_ids передадим результат выполнения regions_getter
securities=[auth.has(Scope.watch_life, only_region_ids=regions_getter)],
# Используем StreamAnswer, поскольку эндпоинт содержит yield
answer_type=StreamAnswer[HttpStatusCode.OK][ContentType.Mp4Video],
exception_type_to_answer_type=exception_to_answer.pick({CameraNotFound})
))
api_server.register(gen_query_rpc_endpoint(
watch_archive_video, "get", "/camera/{camera_id}/video/archive/watch",
# Укажем, что эндпоинт доступен при наличии скоупа watch_archive,
# в качестве значения аргумента only_region_ids передадим результат выполнения regions_getter
securities=[auth.has(Scope.watch_archive, only_region_ids=regions_getter)],
# Используем StreamAnswer, поскольку эндпоинт содержит yield
answer_type=StreamAnswer[HttpStatusCode.OK][ContentType.Mp4Video],
exception_type_to_answer_type=exception_to_answer.pick({CameraNotFound, MissingArchive})
))
api_server.register(gen_query_rpc_endpoint(
export_archive_video, "get", "/camera/{camera_id}/video/archive/export",
# Укажем, что эндпоинт доступен при наличии скоупа export_archive,
# в качестве значения аргумента only_region_ids передадим результат выполнения regions_getter
securities=[auth.has(Scope.export_archive, only_region_ids=regions_getter)],
answer_type=FileAnswer[HttpStatusCode.OK][ContentType.Mp4Video],
# Нужна фабрика, т.к. результат работы обработчика нельзя преобразовать в указанный тип ответа простой передачей
answer_factory=lambda file: FileAnswer[HttpStatusCode.OK][ContentType.Mp4Video](file.content, file.name),
exception_type_to_answer_type=exception_to_answer.pick({CameraNotFound, MissingArchive})
))
async with http_server:
await asyncio.sleep(10 ** 10)
asyncio.run(main())
Запросим живое видео с камеры 10 от имени первого пользователя, с флагом -v, чтобы увидеть детали ответа:
curl -v "http://0.0.0.0:8888/camera/10/video/life/watch" \
-H "Authorization: Bearer u1"
Получим что-то вроде этого:
* Trying 0.0.0.0:8888...
* Connected to 0.0.0.0 (...) port 8888 (#0)
> GET /camera/10/video/life/watch HTTP/1.1
> Host: 0.0.0.0:8888
> User-Agent: curl/...
> Accept: */*
> Authorization: Bearer u1
>
< HTTP/1.1 200 OK
< Content-Type: video/mp4
< Transfer-Encoding: chunked
< Date: ... GMT
< Server: Python/... aiohttp/...
<
<Camera 10, life video chunk 5...>
<Camera 10, life video chunk 5...>
<Camera 10, life video chunk 5...>
<Camera 10, life video chunk 5...>
Поток "видео" будет идти, пока мы его не прервём.
Запросим живое видео с этой же камеры 10 от имени второго пользователя, у которого нет доступа к региону 1:
curl "http://0.0.0.0:8888/camera/10/video/life/watch" \
-H "Authorization: Bearer u2"
В ответ получим 404 Not Found с телом:
{
"error": "10",
"error_type": "CameraNotFound",
"error_code": 1,
"done": false
}
Запросим живое видео с этой же камеры 10 от имени третьего пользователя, у которого нет скоупа watch_life:
curl "http://0.0.0.0:8888/camera/10/video/life/watch" \
-H "Authorization: Bearer u3"
В ответ получим 403 Forbidden с телом:
{
"error": {"http_Bearer[watch_life]": "Forbidden('missing scopes: watch_life')"},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Запросим живое видео с этой же камеры 10 от имени не существующего пользователя:
curl "http://0.0.0.0:8888/camera/10/video/life/watch" \
-H "Authorization: Bearer u4"
В ответ получим 401 Unauthorized с телом:
{
"error": {"http_Bearer[watch_life]": "Unauthorized(\"token='u4' is invalid\")"},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Запросим архивное видео с камеры 10 от имени первого пользователя, у которого нет скоупа watch_archive:
curl "http://0.0.0.0:8888/camera/10/video/archive/watch?at=1000000" \
-H "Authorization: Bearer u1"
В ответ получим 403 Forbidden с телом:
{
"error": {"http_Bearer[watch_archive]": "Forbidden('missing scopes: watch_archive')"},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Запросим архивное видео с камеры 20 от имени второго пользователя:
curl -v "http://0.0.0.0:8888/camera/20/video/archive/watch?at=1000000" \
-H "Authorization: Bearer u2"
Получим:
* Trying 0.0.0.0:8888...
* Connected to 0.0.0.0 (...) port 8888 (#0)
> GET /camera/20/video/archive/watch?at=1000000 HTTP/1.1
> Host: 0.0.0.0:8888
> User-Agent: curl/...
> Accept: */*
> Authorization: Bearer u2
>
< HTTP/1.1 200 OK
< Content-Type: video/mp4
< Transfer-Encoding: chunked
< Date: ...
< Server: Python/... aiohttp/...
<
<Camera 20, archive video chunk 3333333>
<Camera 20, archive video chunk 3333334>
<Camera 20, archive video chunk 3333335>
<Camera 20, archive video chunk 3333336>
Поток "видео" будет идти, пока мы его не прервём.
Запросим экспорт архива с камеры 20 от имени третьего пользователя:
curl -v "http://0.0.0.0:8888/camera/20/video/archive/export?at=1000000&duration=2" \
-H "Authorization: Bearer u3"
Получим в ответ файл cam_20_1970-01-12T16-46-40_1970-01-12T16-47-00.mp4:
* Trying 0.0.0.0:8888...
* Connected to 0.0.0.0 (...) port 8888 (#0)
> GET /camera/20/video/archive/export?at=1000000&duration=2 HTTP/1.1
> Host: 0.0.0.0:8888
> User-Agent: curl/...
> Accept: */*
> Authorization: Bearer u3
>
< HTTP/1.1 200 OK
< Content-Disposition: attachment; filename="cam_20_1970-01-12T16-46-40_1970-01-12T16-46-42.mp4"
< Content-Type: video/mp4
< X-Request-ID: ...
< Instance: readme
< Content-Length: 286
< Date: ...
< Server: Python/... aiohttp/...
<
<Camera 20, archive video chunk 3333333>
<Camera 20, archive video chunk 3333334>
<Camera 20, archive video chunk 3333335>
<Camera 20, archive video chunk 3333336>
<Camera 20, archive video chunk 3333337>
<Camera 20, archive video chunk 3333338>
* Connection #0 to host 0.0.0.0 left intact
<Camera 20, archive video chunk 3333339>
Эндпоинты с несколькими механизмами доступа
Бывает, что один и тот же эндпоинт должен накладывать разные ограничения или даже не накладывать никаких,
в зависимости от аутентификационной и/или авторизационной информации.
Например, если пользователи могут работать со "своими" ресурсами и есть админ, имеющий доступ ко всем ресурсам.
Или если к этому сервису могут обращаться другие сервисы.
Рассмотрим следующий пример:
- В приложении есть одна сущность - заметка(Note).
- Пользователи могут создавать, копировать, модифицировать и удалять заметки. И выгрузить csv.
- Пользователи видят только свои заметки (которые сами и создали).
- Кроме обычных пользователей есть читатели, которые видят все заметки, но не могут модифицировать
- Так же есть админы, которые видят всё и могут всё
- Аутентификацию будем выполнять с помощью передачи токена в query части запроса
- И добавим серверную(для других серверов) аутентификацию с помощью хедера
server_name
ВНИМАНИЕ! Порядок объектов в security ВАЖЕН: будет использоваться ПЕРВЫЙ подходящий
import asyncio
import contextlib
import csv
import enum
import io
import operator
from typing import Literal, Any, TypedDict, TypeAlias
import sqlalchemy
from http_tools import HttpServer, HttpStatusCode
from http_tools.answer import ExceptionAnswer, FileAnswer, WrappedAnswer
from http_tools.mime_types import ContentType
from init_helpers import raise_if
from sqlalchemy import select, Integer, String
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, Mapped, mapped_column
from openapi_tools import ApiKeySecurityScheme, ParameterLocation, OpenApiServer, gen_query_rpc_endpoint, \
gen_json_rpc_endpoint, RpcEndpoint, Unauthorized, RawBodyParameter, PickableDict
# Реестр сущностей SQLAlchemy
sqlalchemy_mapper_registry = sqlalchemy.orm.registry()
# Алиас для не гарантированного объекта сессии (будет понятно дальше)
MaybeSession: TypeAlias = AsyncSession | None
# Опишем сущность заметки
@sqlalchemy_mapper_registry.mapped_as_dataclass
class Note:
__tablename__ = 'notes'
id: Mapped[int | None] = mapped_column(Integer, primary_key=True, init=False)
content: Mapped[str] = mapped_column(String, nullable=False)
author_id: Mapped[int | None] = mapped_column(Integer, nullable=True, index=True)
# helper для удобства переиспользования методов (будет видно ниже)
@contextlib.asynccontextmanager
async def ignore_async_with(value: Any):
yield value
class Controller:
def __init__(self, session_maker: sessionmaker):
self.session_maker = session_maker
# Метод для возвращения сессии БД, при получении существующей сессии, оборачивает её в helper для скипа async with
def ensure_session(self, session: MaybeSession = None) -> AsyncSession:
return self.session_maker() if session is None else ignore_async_with(session)
# Здесь и далее аргумент session позволяет управлять, создаётся объект в новой сессии БД или в существующей
async def create_note(self, content: str, author_id: int | None, session: MaybeSession = None) -> Note:
async with self.ensure_session(session) as session:
session.add(new_note := Note(content=content, author_id=author_id))
await session.commit()
return new_note
# Здесь и далее аргумент only_author_id используется для ограничения доступа
async def get_notes(self, only_author_id: int | None, limit: int | None = 100, offset: int = 0) -> list[Note]:
async with self.ensure_session() as session:
query = select(Note).limit(limit).offset(offset)
query = query if only_author_id is None else query.where(Note.author_id == only_author_id)
return list((await session.execute(query)).scalars())
async def export_notes(self, only_author_id: int) -> tuple[bytes, str]:
notes = await self.get_notes(only_author_id=only_author_id, limit=None, offset=0)
writer = csv.writer(output := io.StringIO())
writer.writerow(['id', 'content', 'author_id'])
[writer.writerow([note.id, note.content, note.author_id]) for note in notes]
return output.getvalue().encode(), 'notes.csv'
async def get_note(self, note_id: int, only_author_id: int | None, session: MaybeSession = None) -> Note:
query = select(Note).where(Note.id == note_id)
query = query if only_author_id is None else query.where(Note.author_id == only_author_id)
async with self.ensure_session(session) as session:
return (await session.execute(query)).scalar_one()
async def copy_note(self, note_id: int, only_author_id: int | None, author_id: int | None) -> Note:
async with self.ensure_session() as session:
old_note = await self.get_note(note_id=note_id, only_author_id=only_author_id, session=session)
new_note = await self.create_note(content=old_note.content, author_id=author_id, session=session)
await session.commit()
return new_note
async def update_note(self, note_id: int, content: str, only_author_id: int | None) -> Note:
async with self.ensure_session() as session:
# используется метод, описанный выше
note = await self.get_note(note_id=note_id, only_author_id=only_author_id, session=session)
note.content = content
await session.commit()
return note
async def delete_note(self, note_id: int, only_author_id: int | None) -> None:
async with self.ensure_session() as session:
note = await self.get_note(note_id=note_id, only_author_id=only_author_id, session=session)
await session.delete(note)
await session.commit()
async def import_notes(self, csv_file: bytes, author_id: int | None) -> list[int]:
# Не хочу писать реальный парсер, буду вне зависимости от содержимого создавать одну заметку
async with self.ensure_session() as session:
new_note = await self.create_note(content=str(len(csv_file)), author_id=author_id, session=session)
await session.commit()
return [new_note.id]
async def main():
engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=True)
async with engine.begin() as conn:
await conn.run_sync(sqlalchemy_mapper_registry.metadata.create_all)
http_server = HttpServer(HttpServer.Config(port=8888), HttpServer.Context(instance_id="readme"))
api_server = OpenApiServer(OpenApiServer.Config(), OpenApiServer.Context('ABM', 'openapi', 'readme', http_server))
# noinspection PyTypeChecker
controller = Controller(sessionmaker(engine, expire_on_commit=False, class_=AsyncSession))
# Опишем обработку исключений
exception_to_answer = PickableDict({
sqlalchemy.exc.NoResultFound: ExceptionAnswer[HttpStatusCode.NotFound],
})
# Опишем скоупы в виде enum
class Scope(enum.StrEnum):
create = 'create' # право на создание
read_own = 'read_own' # право на чтение своих заметок
read_any = 'read_any' # право на чтение любых заметок
update_own = 'update_own' # право на модификацию своих заметок
update_any = 'update_any' # право на модификацию любых заметок
delete_own = 'delete_own' # право на удаление своих заметок
delete_any = 'delete_any' # право на удаление любых заметок
other = 'other' # иное право(не испольузется)
# Опишем структуру информации о пользователе, получаемую при аутентификации по токену
class UserInfo(TypedDict):
user_id: int
scope: list[str]
# Описываем callable для получения id пользователя из UserInfo
get_user_id = operator.itemgetter('user_id')
# Опишем структуру информации о пользователе, получаемую при аутентификации с помощью хедера
class EmptyDict(TypedDict):
pass
# функция, подменяющая внешнюю авторизационную систему в нашем примере
# сигнатура всегда такая - возвращает кортеж из информации о пользователе и доступных скоупах
def resolver(token: str) -> tuple[UserInfo, list[Scope]]:
auth_info = {
'first_token': {'user_id': 1, 'scope': [Scope.create, Scope.read_own, Scope.update_own, Scope.delete_own]},
'second_token': {'user_id': 2, 'scope': [Scope.create, Scope.read_own, Scope.update_own, Scope.delete_own]},
'admin_token': {'user_id': 3, 'scope': [Scope.create, Scope.read_any, Scope.update_any, Scope.delete_any]},
'other_token': {'user_id': 5, 'scope': [Scope.other]},
'reader_token': {'user_id': 6, 'scope': [Scope.read_any]},
}.get(token)
raise_if(auth_info is None, Unauthorized(f'{token=} is invalid'))
return auth_info, auth_info['scope']
# Опишем основную схему авторизации: через токен в query
query_auth = ApiKeySecurityScheme[UserInfo](location=ParameterLocation.query, name='token', resolver=resolver)
# Опишем дополнительную схему авторизации через хедер.
# Мы не хотим сообщать о её наличии, поэтому добавляем do_log=False
header_auth = ApiKeySecurityScheme[EmptyDict](
location=ParameterLocation.header, name='server_name', resolver=lambda x: ({}, list[Scope]), do_log=False)
# Зарегистрируем метод создания заметки
api_server.register(gen_json_rpc_endpoint(
controller.create_note, "POST", "/notes",
# Все security обязаны иметь одинаковый перечень ключевых аргументов, даже если у обработчика есть дефолт
securities=[
# При обработке запросов, аутентифицированных через хедер, в значении аргумента author_id используем None
header_auth.has(author_id=Literal[None]),
# При обработке запросов, аутентифицированных через query, в значении аргумента author_id используем user_id
query_auth.has(Scope.create, author_id=get_user_id)
], arg_name_to_param={
'session': None # При автогенерации нужно указать, что мы не хотим публиковать аргумент session
}
))
api_server.register(gen_query_rpc_endpoint(
controller.get_notes, "GET", "/notes", securities=[
# При аутентификации через хедер, передаём None в only_author_id, чем снимаем ограничения на видимые заметки
header_auth.has(only_author_id=Literal[None]),
# При аутентификации через query, если есть скоуп read_any, то тоже передаём None в only_author_id
query_auth.has(Scope.read_any, only_author_id=Literal[None]),
# При аутентификации через query, если прошлый метод авторизации не сработал, т.е. нет скоупа read_any,
# то передаём значение user_id в only_author_id, тем самым ограничивая видимые заметки
query_auth.has(Scope.read_own, only_author_id=get_user_id),
], exception_type_to_answer_type={}
))
# Серверам и юзерам с read_any доступны для чтения все заметки, с read_own - только свои
api_server.register(gen_query_rpc_endpoint(
controller.get_note, "GET", "/notes/{note_id}", securities=[
header_auth.has(only_author_id=Literal[None]),
query_auth.has(Scope.read_any, only_author_id=Literal[None]),
query_auth.has(Scope.read_own, only_author_id=get_user_id),
], arg_name_to_param={'session': None},
exception_type_to_answer_type=exception_to_answer.pick({sqlalchemy.exc.NoResultFound})
))
# Серверам и юзерам с read_any доступны для чтения все заметки, с read_own - только свои
api_server.register(RpcEndpoint(
controller.export_notes, "GET", "/notes/export", securities=[
header_auth.has(only_author_id=Literal[None]),
query_auth.has(Scope.read_any, only_author_id=Literal[None]),
query_auth.has(Scope.read_own, only_author_id=get_user_id),
], arg_name_to_param={},
answer_type=FileAnswer[HttpStatusCode.OK][ContentType.CSV],
# Нужно указать answer_factory, т.к. класс FileAnswer.__init__ не принимает напрямую tuple[Bytes, str]
answer_factory=lambda x: FileAnswer[HttpStatusCode.OK][ContentType.CSV](payload=x[0], file_name=x[1]),
exception_type_to_answer_type=exception_to_answer.pick({sqlalchemy.exc.NoResultFound})
))
# Сервера создают заметки без авторства, юзеры с create - со своим авторством
api_server.register(RpcEndpoint(
controller.import_notes, "POST", "/notes/import", securities=[
header_auth.has(author_id=Literal[None]),
query_auth.has(Scope.create, author_id=get_user_id),
], arg_name_to_param={'csv_file': RawBodyParameter[ContentType.CSV]},
answer_type=WrappedAnswer[HttpStatusCode.OK][list[int]],
exception_type_to_answer_type=exception_to_answer.pick({sqlalchemy.exc.NoResultFound})
))
# Серверам и юзерам с update_any доступны для изменения все заметки, с update_own - только свои
api_server.register(gen_json_rpc_endpoint(
controller.update_note, "PUT", "/notes/{note_id}", securities=[
header_auth.has(only_author_id=Literal[None]),
query_auth.has(Scope.update_any, only_author_id=Literal[None]),
query_auth.has(Scope.update_own, only_author_id=get_user_id),
],
exception_type_to_answer_type=exception_to_answer.pick({sqlalchemy.exc.NoResultFound})
))
# Серверам и юзерам с delete_any доступны для удаления все заметки, с delete_own - только свои
api_server.register(gen_query_rpc_endpoint(
controller.delete_note, "DELETE", "/notes/{note_id}", securities=[
header_auth.has(only_author_id=Literal[None]),
query_auth.has(Scope.delete_any, only_author_id=Literal[None]),
query_auth.has(Scope.delete_own, only_author_id=get_user_id),
],
exception_type_to_answer_type=exception_to_answer.pick({sqlalchemy.exc.NoResultFound})
))
# Серверам и юзерам с read_any&create доступны для копирования все заметки, с read_own&create - только свои
api_server.register(gen_query_rpc_endpoint(
controller.copy_note, "POST", "/notes/{note_id}", securities=[
header_auth.has(only_author_id=Literal[None], author_id=Literal[None]),
query_auth.has(Scope.read_any, Scope.create, only_author_id=Literal[None], author_id=get_user_id),
query_auth.has(Scope.read_own, Scope.create, only_author_id=get_user_id, author_id=get_user_id),
],
exception_type_to_answer_type=exception_to_answer.pick({sqlalchemy.exc.NoResultFound})
))
async with http_server:
await asyncio.sleep(10 ** 10)
if __name__ == '__main__':
asyncio.run(main())
Запросим список заметок от имени админа:
curl "http://0.0.0.0:8888/notes?token=admin_token"
Получим 200 OK с телом:
{"done": true, "result": []}
Это значит, что заметок нет.
Создадим заметку от первого пользователя:
curl -X POST --location "http://0.0.0.0:8888/notes?token=first_token" \
-H "Content-Type: application/json" \
-d '{"content": "111"}'
Получим 200 OK с телом:
{"done": true, "result": {"id": 1, "content": "111", "author_id": 1}}
Запросим список заметок от имени первого пользователя:
curl "http://0.0.0.0:8888/notes?token=first_token"
Получим 200 OK с телом:
{"done": true, "result": [{"id": 1, "content": "111", "author_id": 1}]}
Запросим список заметок от имени второго пользователя:
curl "http://0.0.0.0:8888/notes?token=second_token"
Получим 200 OK с телом:
{"done": true, "result": []}
Создадим заметку от второго пользователя:
curl -X POST --location "http://0.0.0.0:8888/notes?token=second_token" \
-H "Content-Type: application/json" \
-d '{"content": "222"}'
Получим 200 OK с телом:
{"done": true, "result": {"id": 2, "content": "222", "author_id": 2}}
Запросим список заметок от имени другого сервера, аутентифицируясь через хедер server_name:
curl "http://0.0.0.0:8888/notes" -H "server_name: other"
Получим 200 OK с телом:
{"done": true, "result": [
{"id": 1, "content": "111", "author_id": 1},
{"id": 2, "content": "222", "author_id": 2}
]}
Модифицируем заметку 2 от имени админа:
curl -X PUT --location "http://0.0.0.0:8888/notes/2?token=admin_token" \
-H "Content-Type: application/json" \
-d '{"content": "qqq"}'
Получим 200 OK с телом:
{"done": true, "result": {"id": 2, "content": "qqq", "author_id": 2}}
Запросим список заметок от имени читателя:
curl "http://0.0.0.0:8888/notes?token=reader_token"
Получим 200 OK с телом:
{"done": true, "result": [
{"id": 1, "content": "111", "author_id": 1},
{"id": 2, "content": "qqq", "author_id": 2}
]}
Попробуем модифицировать заметку 2 от имени читателя:
curl -X PUT --location "http://0.0.0.0:8888/notes/2?token=reader_token" \
-H "Content-Type: application/json" \
-d '{"content": "fail"}'
Получим 403 Forbidden с телом, указывающим, что есть два варианта авторизации(серверный скрыт из-за do_log=False),
в первом пользователю нехватает скоупа update_any, во втором - update_own
{
"error": {
"api_key_in_query_token[update_any]": "Forbidden('missing scopes: update_any')",
"api_key_in_query_token[update_own]": "Forbidden('missing scopes: update_own')"
},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Попробуем модифицировать заметку 2 от имени первого пользователя (у него ведь есть скоуп update_own):
curl -X PUT --location "http://0.0.0.0:8888/notes/2?token=first_token" \
-H "Content-Type: application/json" \
-d '{"content": "fail"}'
Получим 404 Not Found с телом, указывающим, что такой заявки нет
(она невидима первого пользователя, т.к. принадлежит второму пользователю):
{
"error": "No row was found when one was required",
"error_type": "NoResultFound",
"error_code": null,
"done": false
}
Модифицируем заметку 1 от имени первого пользователя:
curl -X PUT --location "http://0.0.0.0:8888/notes/1?token=first_token" \
-H "Content-Type: application/json" \
-d '{"content": "success"}'
Получим 404 Not Found с телом, указывающим, что такой заявки нет
(она невидима первого пользователя, т.к. принадлежит второму пользователю):
{"done": true, "result": {"id": 1, "content": "success", "author_id": 1}}
Скопируем заметку 1 от имени админа:
curl -X POST --location "http://0.0.0.0:8888/notes/1?token=admin_token"
Получим 200 OK с телом:
{"done": true, "result": {"id": 3, "content": "success", "author_id": 3}}
Удалим первоую заявку от имени админа:
curl -X DELETE --location "http://0.0.0.0:8888/notes/1?token=admin_token"
Получим 200 OK с телом:
{"done": true}
Попробуем скопировать заметку 3 от имени читателя:
curl -X POST --location "http://0.0.0.0:8888/notes/3?token=reader_token"
Получим 403 Forbidden с телом, указывающим, что есть два варианта авторизации(серверный скрыт из-за do_log=False),
в первом пользователю нехватает скоупа create, во втором - скоупов create и read_own
{
"error": {
"api_key_in_query_token[create,read_any]": "Forbidden('missing scopes: create')",
"api_key_in_query_token[create,read_own]": "Forbidden('missing scopes: create, read_own')"
},
"error_type": "ErrorGroup",
"error_code": null,
"done": false
}
Импортируем заметку на основе файла от имени сервера:
curl -X POST --location "http://0.0.0.0:8888/notes/import" \
-H "server_name: other" \
-H "Content-Type: text/csv" \
-d 'test,passed'
Получим 200 OK с телом, содержащим id новых заметок:
{"done": true, "result": [4]}
Выгрузим все заметки от имени читателя:
curl -v "http://0.0.0.0:8888/notes/export?token=reader_token"
Получим 200 OK с CSV телом с тремя заметками:
* Trying 0.0.0.0:8888...
* Connected to 0.0.0.0 (...) port 8888 (#0)
> GET /notes/export?token=reader_token HTTP/1.1
> Host: 0.0.0.0:8888
> User-Agent: curl/...
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Disposition: attachment; filename="notes.csv"
< Content-Type: text/csv; charset=UTF-8
< X-Request-ID: ...
< Instance: readme
< Content-Length: 51
< Date: ...
< Server: Python/... aiohttp/...
<
id,content,author_id
2,qqq,2
3,success,3
4,11,
* Connection #0 to host 0.0.0.0 left intact