aiohttp
Пример асинхронного клиента с поддержкой контекста
import aiohttp
import logging
import typing
import urllib.parse
class HTTPClient:
USER_AGENT = 'My user agent'
base_url: str
def __init__(self, base_url: str, logger: logging.Logger) -> None:
self.base_url = base_url
self.logger = logger
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._session.close()
async def connect(self):
connector = aiohttp.TCPConnector(force_close=True)
self._session = aiohttp.ClientSession(
base_url=self.base_url,
connector=connector,
headers={
'User-Agent': self.USER_AGENT
}
)
@staticmethod
def urlencode(data: typing.Dict) -> str:
return urllib.parse.urlencode(data)
async def get_info(self, id: str, search: str) -> typing.List[str]:
query = {
'search': search
}
async with self._session.get(f'/api/{id}?{HTTPClient.urlencode(query)}') as response:
if response.ok:
data = await response.json()
return data
else:
print(response)
return list()
Пример GraphQL клиента:
import aiohttp
import json
class GQLClient:
USER_AGENT = 'My user agent'
base_url: str
def __init__(self, base_url: str):
self.base_url = base_url
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._session.close()
async def connect(self):
headers = {
'User-Agent': self.USER_AGENT,
"Content-Type": "application/json",
}
self._session = aiohttp.ClientSession(base_url=self.base_url, headers=headers)
async def query_url(self, query, **kwargs):
async with self._session.post(
"/api", data=json.dumps({"query": query}), **kwargs
) as resp:
return json.loads(await resp.read())["data"]
Пример использования подобного клиента
async with HTTPClient("https://example.com", None) as http_client:
data = await http_client.get_info("1124", "search smth")
Last updated