Асинхронное программирование в Python
В этой статье мы рассмотрим асинхронное программирование в Python для создания высокопроизводительных ботов, включая asyncio, async/await, конкурентность и практические примеры.Содержание
- Основы асинхронности
- asyncio и async/await
- Конкурентность и параллелизм
- Асинхронные боты
- Обработка ошибок
- Производительность и оптимизация
- Практические примеры
Основы асинхронности
Синхронное vs Асинхронное программирование
# Синхронный код
import time
import requests
def sync_fetch_data(urls):
"""Синхронное получение данных"""
results = []
for url in urls:
print(f"Запрос к {url}")
response = requests.get(url)
results.append(response.json())
print(f"Получен ответ от {url}")
return results
# Измерение времени
start_time = time.time()
urls = [
"https://api.github.com/users/octocat",
"https://api.github.com/users/defunkt",
"https://api.github.com/users/mojombo"
]
results = sync_fetch_data(urls)
end_time = time.time()
print(f"Синхронное выполнение заняло: {end_time - start_time:.2f} секунд")# Асинхронный код
import asyncio
import aiohttp
import time
async def async_fetch_data(urls):
"""Асинхронное получение данных"""
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
task = fetch_single_url(session, url)
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
async def fetch_single_url(session, url):
"""Получение данных с одного URL"""
print(f"Запрос к {url}")
async with session.get(url) as response:
result = await response.json()
print(f"Получен ответ от {url}")
return result
# Измерение времени
async def main():
start_time = time.time()
urls = [
"https://api.github.com/users/octocat",
"https://api.github.com/users/defunkt",
"https://api.github.com/users/mojombo"
]
results = await async_fetch_data(urls)
end_time = time.time()
print(f"Асинхронное выполнение заняло: {end_time - start_time:.2f} секунд")
# Запуск
asyncio.run(main())Ключевые концепции
# Основные концепции асинхронности
import asyncio
# 1. Coroutine (корутина)
async def simple_coroutine():
"""Простая корутина"""
print("Начало корутины")
await asyncio.sleep(1) # Асинхронная задержка
print("Конец корутины")
return "Результат"
# 2. Event Loop (цикл событий)
async def event_loop_example():
"""Пример работы с циклом событий"""
loop = asyncio.get_event_loop()
print(f"Текущий цикл событий: {loop}")
# Запуск корутины
result = await simple_coroutine()
print(f"Результат: {result}")
# 3. Task (задача)
async def task_example():
"""Пример работы с задачами"""
# Создание задачи
task = asyncio.create_task(simple_coroutine())
# Ожидание завершения
result = await task
print(f"Задача завершена: {result}")
# 4. Future (будущее значение)
async def future_example():
"""Пример работы с Future"""
loop = asyncio.get_event_loop()
future = loop.create_future()
# Установка результата
future.set_result("Результат Future")
# Получение результата
result = await future
print(f"Future результат: {result}")
# Запуск примеров
asyncio.run(event_loop_example())
asyncio.run(task_example())
asyncio.run(future_example())asyncio и async/await
Основы asyncio
# Основные функции asyncio
import asyncio
import random
async def async_function(name, delay):
"""Асинхронная функция с задержкой"""
print(f"{name} начал выполнение")
await asyncio.sleep(delay)
print(f"{name} завершил выполнение")
return f"Результат {name}"
async def basic_asyncio_examples():
"""Основные примеры использования asyncio"""
# 1. Простой await
print("=== Простой await ===")
result = await async_function("Функция 1", 1)
print(f"Результат: {result}")
# 2. asyncio.gather() - параллельное выполнение
print("\n=== asyncio.gather() ===")
tasks = [
async_function("Задача 1", 1),
async_function("Задача 2", 2),
async_function("Задача 3", 1.5)
]
results = await asyncio.gather(*tasks)
print(f"Результаты: {results}")
# 3. asyncio.wait() - ожидание с условиями
print("\n=== asyncio.wait() ===")
tasks = [asyncio.create_task(async_function(f"Задача {i}", random.uniform(0.5, 2)) for i in range(5)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"Завершено: {len(done)}, Ожидает: {len(pending)}")
# 4. asyncio.as_completed() - получение результатов по мере готовности
print("\n=== asyncio.as_completed() ===")
tasks = [async_function(f"Задача {i}", random.uniform(0.5, 2)) for i in range(3)]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Получен результат: {result}")
# Запуск
asyncio.run(basic_asyncio_examples())Контекстные менеджеры
# Асинхронные контекстные менеджеры
import asyncio
import aiohttp
import aiofiles
class AsyncResourceManager:
"""Асинхронный контекстный менеджер"""
async def __aenter__(self):
"""Вход в контекст"""
print("Инициализация ресурса")
self.resource = "Ресурс создан"
return self.resource
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Выход из контекста"""
print("Очистка ресурса")
if exc_type:
print(f"Ошибка в контексте: {exc_val}")
async def context_manager_example():
"""Пример использования асинхронного контекстного менеджера"""
async with AsyncResourceManager() as resource:
print(f"Используем ресурс: {resource}")
await asyncio.sleep(1)
print("Работа с ресурсом завершена")
# Примеры с реальными библиотеками
async def aiohttp_example():
"""Пример с aiohttp"""
async with aiohttp.ClientSession() as session:
async with session.get('https://api.github.com/users/octocat') as response:
data = await response.json()
print(f"GitHub пользователь: {data['login']}")
async def aiofiles_example():
"""Пример с aiofiles"""
async with aiofiles.open('test.txt', 'w') as f:
await f.write('Асинхронная запись в файл')
async with aiofiles.open('test.txt', 'r') as f:
content = await f.read()
print(f"Содержимое файла: {content}")
# Запуск примеров
asyncio.run(context_manager_example())
asyncio.run(aiohttp_example())
asyncio.run(aiofiles_example())Конкурентность и параллелизм
Семафоры и ограничения
# Семафоры для ограничения конкурентности
import asyncio
import aiohttp
import time
class RateLimiter:
"""Ограничитель скорости запросов"""
def __init__(self, rate_limit=10, time_window=60):
self.rate_limit = rate_limit
self.time_window = time_window
self.requests = []
self.semaphore = asyncio.Semaphore(rate_limit)
async def acquire(self):
"""Получение разрешения на запрос"""
await self.semaphore.acquire()
now = time.time()
# Удаляем старые запросы
self.requests = [req_time for req_time in self.requests if now - req_time < self.time_window]
if len(self.requests) >= self.rate_limit:
# Ждем освобождения слота
await asyncio.sleep(self.time_window - (now - self.requests[0]))
self.requests.append(now)
def release(self):
"""Освобождение семафора"""
self.semaphore.release()
async def make_request(session, url, rate_limiter):
"""Выполнение запроса с ограничением скорости"""
await rate_limiter.acquire()
try:
async with session.get(url) as response:
data = await response.json()
return data
finally:
rate_limiter.release()
async def rate_limited_requests():
"""Пример запросов с ограничением скорости"""
rate_limiter = RateLimiter(rate_limit=5, time_window=10)
async with aiohttp.ClientSession() as session:
urls = [f"https://api.github.com/users/user{i}" for i in range(20)]
tasks = [make_request(session, url, rate_limiter) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
print(f"Выполнено {len(results)} запросов")
return results
# Запуск
asyncio.run(rate_limited_requests())Очереди и производители/потребители
# Асинхронные очереди
import asyncio
import random
import time
async def producer(queue, name, items):
"""Производитель данных"""
for i in range(items):
item = f"Элемент {i} от {name}"
await queue.put(item)
print(f"{name} добавил: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
# Сигнал завершения
await queue.put(None)
async def consumer(queue, name):
"""Потребитель данных"""
while True:
item = await queue.get()
if item is None:
break
print(f"{name} обработал: {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def queue_example():
"""Пример работы с очередями"""
queue = asyncio.Queue(maxsize=5)
# Создание задач
producers = [
asyncio.create_task(producer(queue, "Производитель 1", 5)),
asyncio.create_task(producer(queue, "Производитель 2", 3))
]
consumers = [
asyncio.create_task(consumer(queue, "Потребитель 1")),
asyncio.create_task(consumer(queue, "Потребитель 2"))
]
# Ожидание завершения производителей
await asyncio.gather(*producers)
# Сигнал завершения для потребителей
await queue.put(None)
await queue.put(None)
# Ожидание завершения потребителей
await asyncio.gather(*consumers)
# Запуск
asyncio.run(queue_example())Асинхронные боты
Telegram Bot с asyncio
# Асинхронный Telegram бот
import asyncio
import logging
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
class AsyncTelegramBot:
"""Асинхронный Telegram бот"""
def __init__(self, token):
self.token = token
self.application = Application.builder().token(token).build()
self.setup_handlers()
def setup_handlers(self):
"""Настройка обработчиков"""
# Команды
self.application.add_handler(CommandHandler("start", self.start_command))
self.application.add_handler(CommandHandler("help", self.help_command))
self.application.add_handler(CommandHandler("async", self.async_command))
# Сообщения
self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка команды /start"""
await update.message.reply_text(
"Привет! Я асинхронный бот. Используй /async для демонстрации асинхронности."
)
async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка команды /help"""
help_text = """
Доступные команды:
/start - Начать работу с ботом
/help - Показать это сообщение
/async - Демонстрация асинхронности
"""
await update.message.reply_text(help_text)
async def async_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Демонстрация асинхронности"""
message = await update.message.reply_text("Начинаем асинхронные задачи...")
# Создание асинхронных задач
tasks = [
self.async_task("Задача 1", 2),
self.async_task("Задача 2", 3),
self.async_task("Задача 3", 1.5)
]
# Параллельное выполнение
results = await asyncio.gather(*tasks)
# Отправка результатов
result_text = "Результаты асинхронных задач:\n" + "\n".join(results)
await message.edit_text(result_text)
async def async_task(self, name, delay):
"""Асинхронная задача"""
await asyncio.sleep(delay)
return f"{name} завершена за {delay} секунд"
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка текстовых сообщений"""
user_message = update.message.text
user_id = update.message.from_user.id
# Асинхронная обработка сообщения
response = await self.process_message_async(user_message, user_id)
await update.message.reply_text(response)
async def process_message_async(self, message, user_id):
"""Асинхронная обработка сообщения"""
# Имитация асинхронной обработки
await asyncio.sleep(0.5)
# Простая обработка
if "привет" in message.lower():
return "Привет! Как дела?"
elif "время" in message.lower():
import datetime
return f"Текущее время: {datetime.datetime.now().strftime('%H:%M:%S')}"
else:
return f"Вы написали: {message}"
async def run(self):
"""Запуск бота"""
logger.info("Запуск асинхронного Telegram бота...")
await self.application.run_polling()
# Использование
async def main():
bot = AsyncTelegramBot("YOUR_BOT_TOKEN")
await bot.run()
# Запуск
if __name__ == "__main__":
asyncio.run(main())Discord Bot с asyncio
# Асинхронный Discord бот
import asyncio
import discord
from discord.ext import commands
import aiohttp
import json
class AsyncDiscordBot:
"""Асинхронный Discord бот"""
def __init__(self, token):
self.token = token
self.bot = commands.Bot(command_prefix='!', intents=discord.Intents.all())
self.setup_commands()
def setup_commands(self):
"""Настройка команд"""
@self.bot.command(name='async')
async def async_command(ctx):
"""Демонстрация асинхронности"""
message = await ctx.send("Выполняем асинхронные задачи...")
# Создание асинхронных задач
tasks = [
self.fetch_user_data("octocat"),
self.fetch_user_data("defunkt"),
self.fetch_user_data("mojombo")
]
# Параллельное выполнение
results = await asyncio.gather(*tasks, return_exceptions=True)
# Отправка результатов
embed = discord.Embed(title="Результаты асинхронных запросов", color=0x00ff00)
for i, result in enumerate(results):
if isinstance(result, Exception):
embed.add_field(name=f"Задача {i+1}", value=f"Ошибка: {result}", inline=False)
else:
embed.add_field(name=f"Задача {i+1}", value=f"Пользователь: {result['login']}", inline=False)
await message.edit(content="", embed=embed)
@self.bot.command(name='ping')
async def ping_command(ctx):
"""Проверка задержки"""
latency = self.bot.latency * 1000
await ctx.send(f"Понг! Задержка: {latency:.2f}мс")
@self.bot.event
async def on_ready():
"""Событие готовности бота"""
print(f'{self.bot.user} подключен к Discord!')
async def fetch_user_data(self, username):
"""Получение данных пользователя GitHub"""
async with aiohttp.ClientSession() as session:
async with session.get(f'https://api.github.com/users/{username}') as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"Ошибка получения данных для {username}")
async def run(self):
"""Запуск бота"""
await self.bot.start(self.token)
# Использование
async def main():
bot = AsyncDiscordBot("YOUR_DISCORD_TOKEN")
await bot.run()
# Запуск
if __name__ == "__main__":
asyncio.run(main())Обработка ошибок
Асинхронная обработка ошибок
# Обработка ошибок в асинхронном коде
import asyncio
import aiohttp
import logging
logger = logging.getLogger(__name__)
class AsyncErrorHandler:
"""Обработчик ошибок для асинхронного кода"""
@staticmethod
async def safe_request(url, session):
"""Безопасный запрос с обработкой ошибок"""
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status
)
except asyncio.TimeoutError:
logger.error(f"Таймаут запроса к {url}")
return None
except aiohttp.ClientError as e:
logger.error(f"Ошибка клиента для {url}: {e}")
return None
except Exception as e:
logger.error(f"Неожиданная ошибка для {url}: {e}")
return None
@staticmethod
async def retry_request(url, session, max_retries=3, delay=1):
"""Запрос с повторными попытками"""
for attempt in range(max_retries):
try:
result = await AsyncErrorHandler.safe_request(url, session)
if result is not None:
return result
if attempt < max_retries - 1:
logger.info(f"Повторная попытка {attempt + 1} для {url}")
await asyncio.sleep(delay (2 * attempt)) # Экспоненциальная задержка
except Exception as e:
logger.error(f"Попытка {attempt + 1} не удалась для {url}: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(delay (2 * attempt))
logger.error(f"Все попытки исчерпаны для {url}")
return None
async def error_handling_example():
"""Пример обработки ошибок"""
urls = [
"https://api.github.com/users/octocat",
"https://api.github.com/users/nonexistent",
"https://invalid-url.com",
"https://api.github.com/users/defunkt"
]
async with aiohttp.ClientSession() as session:
tasks = [AsyncErrorHandler.retry_request(url, session) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"URL {i+1}: Ошибка - {result}")
elif result is None:
print(f"URL {i+1}: Не удалось получить данные")
else:
print(f"URL {i+1}: Успешно - {result.get('login', 'N/A')}")
# Запуск
asyncio.run(error_handling_example())Circuit Breaker паттерн
# Circuit Breaker для асинхронного кода
import asyncio
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class AsyncCircuitBreaker:
"""Асинхронный Circuit Breaker"""
def __init__(self, failure_threshold=5, recovery_timeout=60, expected_exception=Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func, args, *kwargs):
"""Вызов функции через Circuit Breaker"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit Breaker is OPEN")
try:
result = await func(args, *kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self):
"""Проверка возможности сброса"""
return (time.time() - self.last_failure_time) >= self.recovery_timeout
def _on_success(self):
"""Обработка успешного вызова"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Обработка неудачного вызова"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Пример использования
async def unreliable_service():
"""Ненадежный сервис"""
import random
if random.random() < 0.7: # 70% вероятность ошибки
raise Exception("Сервис недоступен")
return "Данные получены"
async def circuit_breaker_example():
"""Пример использования Circuit Breaker"""
breaker = AsyncCircuitBreaker(failure_threshold=3, recovery_timeout=5)
for i in range(10):
try:
result = await breaker.call(unreliable_service)
print(f"Попытка {i+1}: {result}")
except Exception as e:
print(f"Попытка {i+1}: {e}")
await asyncio.sleep(1)
# Запуск
asyncio.run(circuit_breaker_example())Производительность и оптимизация
Профилирование асинхронного кода
# Профилирование асинхронного кода
import asyncio
import time
import cProfile
import pstats
from functools import wraps
def async_profile(func):
"""Декоратор для профилирования асинхронных функций"""
@wraps(func)
async def wrapper(args, *kwargs):
profiler = cProfile.Profile()
profiler.enable()
start_time = time.time()
result = await func(args, *kwargs)
end_time = time.time()
profiler.disable()
print(f"Функция {func.__name__} выполнена за {end_time - start_time:.2f} секунд")
# Сохранение результатов профилирования
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10) # Топ 10 функций
return result
return wrapper
class AsyncProfiler:
"""Профилировщик для асинхронного кода"""
def __init__(self):
self.timings = {}
self.call_counts = {}
def time_function(self, func_name):
"""Декоратор для измерения времени выполнения"""
def decorator(func):
@wraps(func)
async def wrapper(args, *kwargs):
start_time = time.time()
result = await func(args, *kwargs)
end_time = time.time()
execution_time = end_time - start_time
self.timings[func_name] = self.timings.get(func_name, 0) + execution_time
self.call_counts[func_name] = self.call_counts.get(func_name, 0) + 1
return result
return wrapper
return decorator
def print_stats(self):
"""Вывод статистики"""
print("\n=== Статистика производительности ===")
for func_name, total_time in self.timings.items():
call_count = self.call_counts[func_name]
avg_time = total_time / call_count
print(f"{func_name}: {total_time:.3f}с (всего), {avg_time:.3f}с (среднее), {call_count} вызовов")
# Пример использования
profiler = AsyncProfiler()
@profiler.time_function("async_task")
async def async_task(delay):
"""Асинхронная задача"""
await asyncio.sleep(delay)
return f"Задача выполнена за {delay} секунд"
@async_profile
async def performance_example():
"""Пример измерения производительности"""
tasks = [async_task(0.1) for _ in range(100)]
results = await asyncio.gather(*tasks)
return results
# Запуск
asyncio.run(performance_example())
profiler.print_stats()Оптимизация памяти
# Оптимизация памяти в асинхронном коде
import asyncio
import gc
import tracemalloc
from typing import List, Dict, Any
class MemoryOptimizedProcessor:
"""Процессор с оптимизацией памяти"""
def __init__(self, batch_size=1000):
self.batch_size = batch_size
self.processed_count = 0
async def process_large_dataset(self, data: List[Dict[str, Any]]):
"""Обработка большого набора данных батчами"""
results = []
for i in range(0, len(data), self.batch_size):
batch = data[i:i + self.batch_size]
batch_results = await self.process_batch(batch)
results.extend(batch_results)
# Принудительная сборка мусора
if i % (self.batch_size * 10) == 0:
gc.collect()
# Логирование прогресса
if i % (self.batch_size * 100) == 0:
print(f"Обработано {i} элементов")
return results
async def process_batch(self, batch: List[Dict[str, Any]]):
"""Обработка батча данных"""
tasks = [self.process_item(item) for item in batch]
return await asyncio.gather(*tasks)
async def process_item(self, item: Dict[str, Any]):
"""Обработка одного элемента"""
# Имитация обработки
await asyncio.sleep(0.001)
return {"processed": True, "id": item.get("id")}
async def memory_optimization_example():
"""Пример оптимизации памяти"""
# Включение трассировки памяти
tracemalloc.start()
# Создание большого набора данных
large_dataset = [{"id": i, "data": f"item_{i}"} for i in range(10000)]
# Обработка с оптимизацией памяти
processor = MemoryOptimizedProcessor(batch_size=500)
results = await processor.process_large_dataset(large_dataset)
# Получение статистики памяти
current, peak = tracemalloc.get_traced_memory()
print(f"Текущее использование памяти: {current / 1024 / 1024:.2f} MB")
print(f"Пиковое использование памяти: {peak / 1024 / 1024:.2f} MB")
tracemalloc.stop()
return results
# Запуск
asyncio.run(memory_optimization_example())Практические примеры
Асинхронный веб-скрапер
# Асинхронный веб-скрапер
import asyncio
import aiohttp
import aiofiles
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
import time
class AsyncWebScraper:
"""Асинхронный веб-скрапер"""
def __init__(self, max_concurrent=10):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.visited_urls = set()
self.scraped_data = []
async def scrape_url(self, session, url):
"""Скрапинг одного URL"""
async with self.semaphore:
try:
async with session.get(url) as response:
if response.status == 200:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
# Извлечение данных
data = {
'url': url,
'title': soup.title.string if soup.title else '',
'links': [urljoin(url, link.get('href')) for link in soup.find_all('a', href=True)],
'text': soup.get_text()[:500] # Первые 500 символов
}
return data
else:
return None
except Exception as e:
print(f"Ошибка при скрапинге {url}: {e}")
return None
async def scrape_multiple_urls(self, urls):
"""Скрапинг нескольких URL"""
async with aiohttp.ClientSession() as session:
tasks = [self.scrape_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Фильтрация результатов
valid_results = [result for result in results if result is not None and not isinstance(result, Exception)]
return valid_results
async def save_results(self, results, filename):
"""Сохранение результатов в файл"""
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
for result in results:
await f.write(f"URL: {result['url']}\n")
await f.write(f"Title: {result['title']}\n")
await f.write(f"Text: {result['text']}\n")
await f.write("-" * 50 + "\n")
async def web_scraper_example():
"""Пример использования веб-скрапера"""
urls = [
"https://httpbin.org/html",
"https://httpbin.org/json",
"https://httpbin.org/xml",
"https://httpbin.org/robots.txt"
]
scraper = AsyncWebScraper(max_concurrent=5)
start_time = time.time()
results = await scraper.scrape_multiple_urls(urls)
end_time = time.time()
print(f"Скрапинг завершен за {end_time - start_time:.2f} секунд")
print(f"Получено {len(results)} результатов")
# Сохранение результатов
await scraper.save_results(results, "scraped_data.txt")
return results
# Запуск
asyncio.run(web_scraper_example())Асинхронный API клиент
# Асинхронный API клиент
import asyncio
import aiohttp
import json
from typing import Dict, List, Optional
class AsyncAPIClient:
"""Асинхронный API клиент"""
def __init__(self, base_url: str, api_key: Optional[str] = None):
self.base_url = base_url
self.api_key = api_key
self.session = None
async def __aenter__(self):
"""Асинхронный контекстный менеджер - вход"""
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Асинхронный контекстный менеджер - выход"""
if self.session:
await self.session.close()
async def get(self, endpoint: str, params: Dict = None) -> Dict:
"""GET запрос"""
url = f"{self.base_url}/{endpoint}"
headers = {}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
async with self.session.get(url, params=params, headers=headers) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"API ошибка: {response.status}")
async def post(self, endpoint: str, data: Dict) -> Dict:
"""POST запрос"""
url = f"{self.base_url}/{endpoint}"
headers = {'Content-Type': 'application/json'}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
async with self.session.post(url, json=data, headers=headers) as response:
if response.status in [200, 201]:
return await response.json()
else:
raise Exception(f"API ошибка: {response.status}")
async def batch_requests(self, requests: List[Dict]) -> List[Dict]:
"""Пакетные запросы"""
tasks = []
for req in requests:
if req['method'] == 'GET':
task = self.get(req['endpoint'], req.get('params'))
elif req['method'] == 'POST':
task = self.post(req['endpoint'], req.get('data'))
else:
continue
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def api_client_example():
"""Пример использования API клиента"""
async with AsyncAPIClient("https://api.github.com") as client:
# Одиночные запросы
user_data = await client.get("users/octocat")
print(f"Пользователь: {user_data['login']}")
# Пакетные запросы
requests = [
{"method": "GET", "endpoint": "users/octocat"},
{"method": "GET", "endpoint": "users/defunkt"},
{"method": "GET", "endpoint": "users/mojombo"}
]
results = await client.batch_requests(requests)
for result in results:
if isinstance(result, Exception):
print(f"Ошибка: {result}")
else:
print(f"Пользователь: {result['login']}")
# Запуск
asyncio.run(api_client_example())Заключение
В этой статье мы рассмотрели асинхронное программирование в Python:- ✅ Основы асинхронности и asyncio
- ✅ Конкурентность и параллелизм
- ✅ Создание асинхронных ботов
- ✅ Обработка ошибок и паттерны
- ✅ Производительность и оптимизация
- ✅ Практические примеры
Полезные ссылки
365 просмотров
2 лайков
0 комментариев
Комментарии (0)
Пока нет комментариев. Будьте первым!