mentortools/libs/: async-tools-abm-2.1.69283 metadata and description

Simple index

Tools to control async code execution

author Mike Orlov
author_email m.orlov@abm-jsc.ru
classifiers
  • Programming Language :: Python :: 3
  • Programming Language :: Python :: 3.9
  • Programming Language :: Python :: 3.10
  • Programming Language :: Python :: 3.11
description_content_type text/markdown
requires_python >=3.9,<4.0
File Tox results History
async_tools_abm-2.1.69283-py3-none-any.whl
Size
10 KB
Type
Python Wheel
Python
3
async_tools_abm-2.1.69283.tar.gz
Size
6 KB
Type
Source

Async tools

Tools to control async code execution

acall

Used to call sync/async objects indifferently
Example:

import asyncio
from async_tools import acall

async def async_process():
    return 1

def sync_process():
    return 2

async def work(processor):
    await acall(processor)   

assert asyncio.run(work(async_process)) == 1 
assert asyncio.run(work(sync_process)) == 2

AsyncOnStartup, AsyncOnShutdown

Used for classes with some async resources

Example:

import dataclasses
import asyncio
from async_tools import AsyncOnStart, AsyncOnStop
from typing import NoReturn


class RequestOnStartup(AsyncOnStart):
    def __init__(self, requester):
        self.requester = requester
        self.outer_configuration = None

    async def _on_start(self):
        self.outer_configuration = await self.requester.get('some url')


class SaveLogOnShutdown(AsyncOnStop):
    def __init__(self, database):
        self.database = database
        self.log = []

    async def _on_stop(self):
        self.database.save(self.log)

class Dummy:
    pass

@dataclasses.dataclass
class ControllerComponents:
    part1: RequestOnStartup
    part2: SaveLogOnShutdown
    part3: Dummy

class Controller:
    def __init__(self):
        requester = ...
        database = ...
        self.components = ControllerComponents(
            RequestOnStartup(requester), 
            SaveLogOnShutdown(database),
            Dummy()
        )
        
    async def run(self):
        await self.init()
        await self.do_work()
        await self.deinit()
    
    async def init(self):
        await AsyncOnStart.start_if_necessary(self.components)
        ...
    
    async def do_work(self) -> NoReturn:
        ...
        
    async def deinit(self):
        await AsyncOnStop.stop_if_necessary(self.components)
        ...

asyncio.run(Controller().run())