aiohttp

ΠŸΡ€ΠΈΠΌΠ΅Ρ€ асинхронного ΠΊΠ»ΠΈΠ΅Π½Ρ‚Π° с ΠΏΠΎΠ΄Π΄Π΅Ρ€ΠΆΠΊΠΎΠΉ контСкста

import aiohttp
import logging
import typing
import urllib.parse


class HTTPClient:
    USER_AGENT = 'My user agent'
    base_url: str

    def __init__(self, base_url: str, logger: logging.Logger) -> None:
        self.base_url = base_url
        self.logger = logger

    async def __aenter__(self):
        await self.connect()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self._session.close()

    async def connect(self):
        connector = aiohttp.TCPConnector(force_close=True)
        self._session = aiohttp.ClientSession(
            base_url=self.base_url,
            connector=connector,
            headers={
                'User-Agent': self.USER_AGENT
            }
        )

    @staticmethod
    def urlencode(data: typing.Dict) -> str:
        return urllib.parse.urlencode(data)

    async def get_info(self, id: str, search: str) -> typing.List[str]:
        query = {
            'search': search
        }
        async with self._session.get(f'/api/{id}?{HTTPClient.urlencode(query)}') as response:
            if response.ok:
                data = await response.json()

                return data
            else:
                print(response)
                return list()

ΠŸΡ€ΠΈΠΌΠ΅Ρ€ GraphQL ΠΊΠ»ΠΈΠ΅Π½Ρ‚Π°:

ΠŸΡ€ΠΈΠΌΠ΅Ρ€ использования ΠΏΠΎΠ΄ΠΎΠ±Π½ΠΎΠ³ΠΎ ΠΊΠ»ΠΈΠ΅Π½Ρ‚Π°

Last updated