Асинхронное программирование в Python

В этой статье мы рассмотрим асинхронное программирование в Python для создания высокопроизводительных ботов, включая asyncio, async/await, конкурентность и практические примеры.

Содержание

Основы асинхронности

Синхронное vs Асинхронное программирование

# Синхронный код
import time
import requests

def sync_fetch_data(urls):
    """Синхронное получение данных"""
    results = []
    for url in urls:
        print(f"Запрос к {url}")
        response = requests.get(url)
        results.append(response.json())
        print(f"Получен ответ от {url}")
    return results

# Измерение времени
start_time = time.time()
urls = [
    "https://api.github.com/users/octocat",
    "https://api.github.com/users/defunkt",
    "https://api.github.com/users/mojombo"
]
results = sync_fetch_data(urls)
end_time = time.time()
print(f"Синхронное выполнение заняло: {end_time - start_time:.2f} секунд")
# Асинхронный код
import asyncio
import aiohttp
import time

async def async_fetch_data(urls):
    """Асинхронное получение данных"""
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = fetch_single_url(session, url)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks)
        return results

async def fetch_single_url(session, url):
    """Получение данных с одного URL"""
    print(f"Запрос к {url}")
    async with session.get(url) as response:
        result = await response.json()
        print(f"Получен ответ от {url}")
        return result

# Измерение времени
async def main():
    start_time = time.time()
    urls = [
        "https://api.github.com/users/octocat",
        "https://api.github.com/users/defunkt",
        "https://api.github.com/users/mojombo"
    ]
    results = await async_fetch_data(urls)
    end_time = time.time()
    print(f"Асинхронное выполнение заняло: {end_time - start_time:.2f} секунд")

# Запуск
asyncio.run(main())

Ключевые концепции

# Основные концепции асинхронности
import asyncio

# 1. Coroutine (корутина)
async def simple_coroutine():
    """Простая корутина"""
    print("Начало корутины")
    await asyncio.sleep(1)  # Асинхронная задержка
    print("Конец корутины")
    return "Результат"

# 2. Event Loop (цикл событий)
async def event_loop_example():
    """Пример работы с циклом событий"""
    loop = asyncio.get_event_loop()
    print(f"Текущий цикл событий: {loop}")
    
    # Запуск корутины
    result = await simple_coroutine()
    print(f"Результат: {result}")

# 3. Task (задача)
async def task_example():
    """Пример работы с задачами"""
    # Создание задачи
    task = asyncio.create_task(simple_coroutine())
    
    # Ожидание завершения
    result = await task
    print(f"Задача завершена: {result}")

# 4. Future (будущее значение)
async def future_example():
    """Пример работы с Future"""
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    
    # Установка результата
    future.set_result("Результат Future")
    
    # Получение результата
    result = await future
    print(f"Future результат: {result}")

# Запуск примеров
asyncio.run(event_loop_example())
asyncio.run(task_example())
asyncio.run(future_example())

asyncio и async/await

Основы asyncio

# Основные функции asyncio
import asyncio
import random

async def async_function(name, delay):
    """Асинхронная функция с задержкой"""
    print(f"{name} начал выполнение")
    await asyncio.sleep(delay)
    print(f"{name} завершил выполнение")
    return f"Результат {name}"

async def basic_asyncio_examples():
    """Основные примеры использования asyncio"""
    
    # 1. Простой await
    print("=== Простой await ===")
    result = await async_function("Функция 1", 1)
    print(f"Результат: {result}")
    
    # 2. asyncio.gather() - параллельное выполнение
    print("\n=== asyncio.gather() ===")
    tasks = [
        async_function("Задача 1", 1),
        async_function("Задача 2", 2),
        async_function("Задача 3", 1.5)
    ]
    results = await asyncio.gather(*tasks)
    print(f"Результаты: {results}")
    
    # 3. asyncio.wait() - ожидание с условиями
    print("\n=== asyncio.wait() ===")
    tasks = [asyncio.create_task(async_function(f"Задача {i}", random.uniform(0.5, 2)) for i in range(5)]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print(f"Завершено: {len(done)}, Ожидает: {len(pending)}")
    
    # 4. asyncio.as_completed() - получение результатов по мере готовности
    print("\n=== asyncio.as_completed() ===")
    tasks = [async_function(f"Задача {i}", random.uniform(0.5, 2)) for i in range(3)]
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Получен результат: {result}")

# Запуск
asyncio.run(basic_asyncio_examples())

Контекстные менеджеры

# Асинхронные контекстные менеджеры
import asyncio
import aiohttp
import aiofiles

class AsyncResourceManager:
    """Асинхронный контекстный менеджер"""
    
    async def __aenter__(self):
        """Вход в контекст"""
        print("Инициализация ресурса")
        self.resource = "Ресурс создан"
        return self.resource
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Выход из контекста"""
        print("Очистка ресурса")
        if exc_type:
            print(f"Ошибка в контексте: {exc_val}")

async def context_manager_example():
    """Пример использования асинхронного контекстного менеджера"""
    async with AsyncResourceManager() as resource:
        print(f"Используем ресурс: {resource}")
        await asyncio.sleep(1)
        print("Работа с ресурсом завершена")

# Примеры с реальными библиотеками
async def aiohttp_example():
    """Пример с aiohttp"""
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.github.com/users/octocat') as response:
            data = await response.json()
            print(f"GitHub пользователь: {data['login']}")

async def aiofiles_example():
    """Пример с aiofiles"""
    async with aiofiles.open('test.txt', 'w') as f:
        await f.write('Асинхронная запись в файл')
    
    async with aiofiles.open('test.txt', 'r') as f:
        content = await f.read()
        print(f"Содержимое файла: {content}")

# Запуск примеров
asyncio.run(context_manager_example())
asyncio.run(aiohttp_example())
asyncio.run(aiofiles_example())

Конкурентность и параллелизм

Семафоры и ограничения

# Семафоры для ограничения конкурентности
import asyncio
import aiohttp
import time

class RateLimiter:
    """Ограничитель скорости запросов"""
    
    def __init__(self, rate_limit=10, time_window=60):
        self.rate_limit = rate_limit
        self.time_window = time_window
        self.requests = []
        self.semaphore = asyncio.Semaphore(rate_limit)
    
    async def acquire(self):
        """Получение разрешения на запрос"""
        await self.semaphore.acquire()
        
        now = time.time()
        # Удаляем старые запросы
        self.requests = [req_time for req_time in self.requests if now - req_time < self.time_window]
        
        if len(self.requests) >= self.rate_limit:
            # Ждем освобождения слота
            await asyncio.sleep(self.time_window - (now - self.requests[0]))
        
        self.requests.append(now)
    
    def release(self):
        """Освобождение семафора"""
        self.semaphore.release()

async def make_request(session, url, rate_limiter):
    """Выполнение запроса с ограничением скорости"""
    await rate_limiter.acquire()
    try:
        async with session.get(url) as response:
            data = await response.json()
            return data
    finally:
        rate_limiter.release()

async def rate_limited_requests():
    """Пример запросов с ограничением скорости"""
    rate_limiter = RateLimiter(rate_limit=5, time_window=10)
    
    async with aiohttp.ClientSession() as session:
        urls = [f"https://api.github.com/users/user{i}" for i in range(20)]
        
        tasks = [make_request(session, url, rate_limiter) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        print(f"Выполнено {len(results)} запросов")
        return results

# Запуск
asyncio.run(rate_limited_requests())

Очереди и производители/потребители

# Асинхронные очереди
import asyncio
import random
import time

async def producer(queue, name, items):
    """Производитель данных"""
    for i in range(items):
        item = f"Элемент {i} от {name}"
        await queue.put(item)
        print(f"{name} добавил: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))
    
    # Сигнал завершения
    await queue.put(None)

async def consumer(queue, name):
    """Потребитель данных"""
    while True:
        item = await queue.get()
        if item is None:
            break
        
        print(f"{name} обработал: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def queue_example():
    """Пример работы с очередями"""
    queue = asyncio.Queue(maxsize=5)
    
    # Создание задач
    producers = [
        asyncio.create_task(producer(queue, "Производитель 1", 5)),
        asyncio.create_task(producer(queue, "Производитель 2", 3))
    ]
    
    consumers = [
        asyncio.create_task(consumer(queue, "Потребитель 1")),
        asyncio.create_task(consumer(queue, "Потребитель 2"))
    ]
    
    # Ожидание завершения производителей
    await asyncio.gather(*producers)
    
    # Сигнал завершения для потребителей
    await queue.put(None)
    await queue.put(None)
    
    # Ожидание завершения потребителей
    await asyncio.gather(*consumers)

# Запуск
asyncio.run(queue_example())

Асинхронные боты

Telegram Bot с asyncio

# Асинхронный Telegram бот
import asyncio
import logging
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes

# Настройка логирования
logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    level=logging.INFO
)
logger = logging.getLogger(__name__)

class AsyncTelegramBot:
    """Асинхронный Telegram бот"""
    
    def __init__(self, token):
        self.token = token
        self.application = Application.builder().token(token).build()
        self.setup_handlers()
    
    def setup_handlers(self):
        """Настройка обработчиков"""
        # Команды
        self.application.add_handler(CommandHandler("start", self.start_command))
        self.application.add_handler(CommandHandler("help", self.help_command))
        self.application.add_handler(CommandHandler("async", self.async_command))
        
        # Сообщения
        self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
    
    async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Обработка команды /start"""
        await update.message.reply_text(
            "Привет! Я асинхронный бот. Используй /async для демонстрации асинхронности."
        )
    
    async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Обработка команды /help"""
        help_text = """
        Доступные команды:
        /start - Начать работу с ботом
        /help - Показать это сообщение
        /async - Демонстрация асинхронности
        """
        await update.message.reply_text(help_text)
    
    async def async_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Демонстрация асинхронности"""
        message = await update.message.reply_text("Начинаем асинхронные задачи...")
        
        # Создание асинхронных задач
        tasks = [
            self.async_task("Задача 1", 2),
            self.async_task("Задача 2", 3),
            self.async_task("Задача 3", 1.5)
        ]
        
        # Параллельное выполнение
        results = await asyncio.gather(*tasks)
        
        # Отправка результатов
        result_text = "Результаты асинхронных задач:\n" + "\n".join(results)
        await message.edit_text(result_text)
    
    async def async_task(self, name, delay):
        """Асинхронная задача"""
        await asyncio.sleep(delay)
        return f"{name} завершена за {delay} секунд"
    
    async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Обработка текстовых сообщений"""
        user_message = update.message.text
        user_id = update.message.from_user.id
        
        # Асинхронная обработка сообщения
        response = await self.process_message_async(user_message, user_id)
        await update.message.reply_text(response)
    
    async def process_message_async(self, message, user_id):
        """Асинхронная обработка сообщения"""
        # Имитация асинхронной обработки
        await asyncio.sleep(0.5)
        
        # Простая обработка
        if "привет" in message.lower():
            return "Привет! Как дела?"
        elif "время" in message.lower():
            import datetime
            return f"Текущее время: {datetime.datetime.now().strftime('%H:%M:%S')}"
        else:
            return f"Вы написали: {message}"
    
    async def run(self):
        """Запуск бота"""
        logger.info("Запуск асинхронного Telegram бота...")
        await self.application.run_polling()

# Использование
async def main():
    bot = AsyncTelegramBot("YOUR_BOT_TOKEN")
    await bot.run()

# Запуск
if __name__ == "__main__":
    asyncio.run(main())

Discord Bot с asyncio

# Асинхронный Discord бот
import asyncio
import discord
from discord.ext import commands
import aiohttp
import json

class AsyncDiscordBot:
    """Асинхронный Discord бот"""
    
    def __init__(self, token):
        self.token = token
        self.bot = commands.Bot(command_prefix='!', intents=discord.Intents.all())
        self.setup_commands()
    
    def setup_commands(self):
        """Настройка команд"""
        @self.bot.command(name='async')
        async def async_command(ctx):
            """Демонстрация асинхронности"""
            message = await ctx.send("Выполняем асинхронные задачи...")
            
            # Создание асинхронных задач
            tasks = [
                self.fetch_user_data("octocat"),
                self.fetch_user_data("defunkt"),
                self.fetch_user_data("mojombo")
            ]
            
            # Параллельное выполнение
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Отправка результатов
            embed = discord.Embed(title="Результаты асинхронных запросов", color=0x00ff00)
            for i, result in enumerate(results):
                if isinstance(result, Exception):
                    embed.add_field(name=f"Задача {i+1}", value=f"Ошибка: {result}", inline=False)
                else:
                    embed.add_field(name=f"Задача {i+1}", value=f"Пользователь: {result['login']}", inline=False)
            
            await message.edit(content="", embed=embed)
        
        @self.bot.command(name='ping')
        async def ping_command(ctx):
            """Проверка задержки"""
            latency = self.bot.latency * 1000
            await ctx.send(f"Понг! Задержка: {latency:.2f}мс")
        
        @self.bot.event
        async def on_ready():
            """Событие готовности бота"""
            print(f'{self.bot.user} подключен к Discord!')
    
    async def fetch_user_data(self, username):
        """Получение данных пользователя GitHub"""
        async with aiohttp.ClientSession() as session:
            async with session.get(f'https://api.github.com/users/{username}') as response:
                if response.status == 200:
                    return await response.json()
                else:
                    raise Exception(f"Ошибка получения данных для {username}")
    
    async def run(self):
        """Запуск бота"""
        await self.bot.start(self.token)

# Использование
async def main():
    bot = AsyncDiscordBot("YOUR_DISCORD_TOKEN")
    await bot.run()

# Запуск
if __name__ == "__main__":
    asyncio.run(main())

Обработка ошибок

Асинхронная обработка ошибок

# Обработка ошибок в асинхронном коде
import asyncio
import aiohttp
import logging

logger = logging.getLogger(__name__)

class AsyncErrorHandler:
    """Обработчик ошибок для асинхронного кода"""
    
    @staticmethod
    async def safe_request(url, session):
        """Безопасный запрос с обработкой ошибок"""
        try:
            async with session.get(url) as response:
                if response.status == 200:
                    return await response.json()
                else:
                    raise aiohttp.ClientResponseError(
                        request_info=response.request_info,
                        history=response.history,
                        status=response.status
                    )
        except asyncio.TimeoutError:
            logger.error(f"Таймаут запроса к {url}")
            return None
        except aiohttp.ClientError as e:
            logger.error(f"Ошибка клиента для {url}: {e}")
            return None
        except Exception as e:
            logger.error(f"Неожиданная ошибка для {url}: {e}")
            return None
    
    @staticmethod
    async def retry_request(url, session, max_retries=3, delay=1):
        """Запрос с повторными попытками"""
        for attempt in range(max_retries):
            try:
                result = await AsyncErrorHandler.safe_request(url, session)
                if result is not None:
                    return result
                
                if attempt < max_retries - 1:
                    logger.info(f"Повторная попытка {attempt + 1} для {url}")
                    await asyncio.sleep(delay  (2 * attempt))  # Экспоненциальная задержка
                
            except Exception as e:
                logger.error(f"Попытка {attempt + 1} не удалась для {url}: {e}")
                if attempt < max_retries - 1:
                    await asyncio.sleep(delay  (2 * attempt))
        
        logger.error(f"Все попытки исчерпаны для {url}")
        return None

async def error_handling_example():
    """Пример обработки ошибок"""
    urls = [
        "https://api.github.com/users/octocat",
        "https://api.github.com/users/nonexistent",
        "https://invalid-url.com",
        "https://api.github.com/users/defunkt"
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [AsyncErrorHandler.retry_request(url, session) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"URL {i+1}: Ошибка - {result}")
            elif result is None:
                print(f"URL {i+1}: Не удалось получить данные")
            else:
                print(f"URL {i+1}: Успешно - {result.get('login', 'N/A')}")

# Запуск
asyncio.run(error_handling_example())

Circuit Breaker паттерн

# Circuit Breaker для асинхронного кода
import asyncio
import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class AsyncCircuitBreaker:
    """Асинхронный Circuit Breaker"""
    
    def __init__(self, failure_threshold=5, recovery_timeout=60, expected_exception=Exception):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    async def call(self, func, args, *kwargs):
        """Вызов функции через Circuit Breaker"""
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit Breaker is OPEN")
        
        try:
            result = await func(args, *kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise e
    
    def _should_attempt_reset(self):
        """Проверка возможности сброса"""
        return (time.time() - self.last_failure_time) >= self.recovery_timeout
    
    def _on_success(self):
        """Обработка успешного вызова"""
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        """Обработка неудачного вызова"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# Пример использования
async def unreliable_service():
    """Ненадежный сервис"""
    import random
    if random.random() < 0.7:  # 70% вероятность ошибки
        raise Exception("Сервис недоступен")
    return "Данные получены"

async def circuit_breaker_example():
    """Пример использования Circuit Breaker"""
    breaker = AsyncCircuitBreaker(failure_threshold=3, recovery_timeout=5)
    
    for i in range(10):
        try:
            result = await breaker.call(unreliable_service)
            print(f"Попытка {i+1}: {result}")
        except Exception as e:
            print(f"Попытка {i+1}: {e}")
        
        await asyncio.sleep(1)

# Запуск
asyncio.run(circuit_breaker_example())

Производительность и оптимизация

Профилирование асинхронного кода

# Профилирование асинхронного кода
import asyncio
import time
import cProfile
import pstats
from functools import wraps

def async_profile(func):
    """Декоратор для профилирования асинхронных функций"""
    @wraps(func)
    async def wrapper(args, *kwargs):
        profiler = cProfile.Profile()
        profiler.enable()
        
        start_time = time.time()
        result = await func(args, *kwargs)
        end_time = time.time()
        
        profiler.disable()
        
        print(f"Функция {func.__name__} выполнена за {end_time - start_time:.2f} секунд")
        
        # Сохранение результатов профилирования
        stats = pstats.Stats(profiler)
        stats.sort_stats('cumulative')
        stats.print_stats(10)  # Топ 10 функций
        
        return result
    return wrapper

class AsyncProfiler:
    """Профилировщик для асинхронного кода"""
    
    def __init__(self):
        self.timings = {}
        self.call_counts = {}
    
    def time_function(self, func_name):
        """Декоратор для измерения времени выполнения"""
        def decorator(func):
            @wraps(func)
            async def wrapper(args, *kwargs):
                start_time = time.time()
                result = await func(args, *kwargs)
                end_time = time.time()
                
                execution_time = end_time - start_time
                self.timings[func_name] = self.timings.get(func_name, 0) + execution_time
                self.call_counts[func_name] = self.call_counts.get(func_name, 0) + 1
                
                return result
            return wrapper
        return decorator
    
    def print_stats(self):
        """Вывод статистики"""
        print("\n=== Статистика производительности ===")
        for func_name, total_time in self.timings.items():
            call_count = self.call_counts[func_name]
            avg_time = total_time / call_count
            print(f"{func_name}: {total_time:.3f}с (всего), {avg_time:.3f}с (среднее), {call_count} вызовов")

# Пример использования
profiler = AsyncProfiler()

@profiler.time_function("async_task")
async def async_task(delay):
    """Асинхронная задача"""
    await asyncio.sleep(delay)
    return f"Задача выполнена за {delay} секунд"

@async_profile
async def performance_example():
    """Пример измерения производительности"""
    tasks = [async_task(0.1) for _ in range(100)]
    results = await asyncio.gather(*tasks)
    return results

# Запуск
asyncio.run(performance_example())
profiler.print_stats()

Оптимизация памяти

# Оптимизация памяти в асинхронном коде
import asyncio
import gc
import tracemalloc
from typing import List, Dict, Any

class MemoryOptimizedProcessor:
    """Процессор с оптимизацией памяти"""
    
    def __init__(self, batch_size=1000):
        self.batch_size = batch_size
        self.processed_count = 0
    
    async def process_large_dataset(self, data: List[Dict[str, Any]]):
        """Обработка большого набора данных батчами"""
        results = []
        
        for i in range(0, len(data), self.batch_size):
            batch = data[i:i + self.batch_size]
            batch_results = await self.process_batch(batch)
            results.extend(batch_results)
            
            # Принудительная сборка мусора
            if i % (self.batch_size * 10) == 0:
                gc.collect()
            
            # Логирование прогресса
            if i % (self.batch_size * 100) == 0:
                print(f"Обработано {i} элементов")
        
        return results
    
    async def process_batch(self, batch: List[Dict[str, Any]]):
        """Обработка батча данных"""
        tasks = [self.process_item(item) for item in batch]
        return await asyncio.gather(*tasks)
    
    async def process_item(self, item: Dict[str, Any]):
        """Обработка одного элемента"""
        # Имитация обработки
        await asyncio.sleep(0.001)
        return {"processed": True, "id": item.get("id")}

async def memory_optimization_example():
    """Пример оптимизации памяти"""
    # Включение трассировки памяти
    tracemalloc.start()
    
    # Создание большого набора данных
    large_dataset = [{"id": i, "data": f"item_{i}"} for i in range(10000)]
    
    # Обработка с оптимизацией памяти
    processor = MemoryOptimizedProcessor(batch_size=500)
    results = await processor.process_large_dataset(large_dataset)
    
    # Получение статистики памяти
    current, peak = tracemalloc.get_traced_memory()
    print(f"Текущее использование памяти: {current / 1024 / 1024:.2f} MB")
    print(f"Пиковое использование памяти: {peak / 1024 / 1024:.2f} MB")
    
    tracemalloc.stop()
    
    return results

# Запуск
asyncio.run(memory_optimization_example())

Практические примеры

Асинхронный веб-скрапер

# Асинхронный веб-скрапер
import asyncio
import aiohttp
import aiofiles
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
import time

class AsyncWebScraper:
    """Асинхронный веб-скрапер"""
    
    def __init__(self, max_concurrent=10):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.visited_urls = set()
        self.scraped_data = []
    
    async def scrape_url(self, session, url):
        """Скрапинг одного URL"""
        async with self.semaphore:
            try:
                async with session.get(url) as response:
                    if response.status == 200:
                        html = await response.text()
                        soup = BeautifulSoup(html, 'html.parser')
                        
                        # Извлечение данных
                        data = {
                            'url': url,
                            'title': soup.title.string if soup.title else '',
                            'links': [urljoin(url, link.get('href')) for link in soup.find_all('a', href=True)],
                            'text': soup.get_text()[:500]  # Первые 500 символов
                        }
                        
                        return data
                    else:
                        return None
            except Exception as e:
                print(f"Ошибка при скрапинге {url}: {e}")
                return None
    
    async def scrape_multiple_urls(self, urls):
        """Скрапинг нескольких URL"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.scrape_url(session, url) for url in urls]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Фильтрация результатов
            valid_results = [result for result in results if result is not None and not isinstance(result, Exception)]
            return valid_results
    
    async def save_results(self, results, filename):
        """Сохранение результатов в файл"""
        async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
            for result in results:
                await f.write(f"URL: {result['url']}\n")
                await f.write(f"Title: {result['title']}\n")
                await f.write(f"Text: {result['text']}\n")
                await f.write("-" * 50 + "\n")

async def web_scraper_example():
    """Пример использования веб-скрапера"""
    urls = [
        "https://httpbin.org/html",
        "https://httpbin.org/json",
        "https://httpbin.org/xml",
        "https://httpbin.org/robots.txt"
    ]
    
    scraper = AsyncWebScraper(max_concurrent=5)
    
    start_time = time.time()
    results = await scraper.scrape_multiple_urls(urls)
    end_time = time.time()
    
    print(f"Скрапинг завершен за {end_time - start_time:.2f} секунд")
    print(f"Получено {len(results)} результатов")
    
    # Сохранение результатов
    await scraper.save_results(results, "scraped_data.txt")
    
    return results

# Запуск
asyncio.run(web_scraper_example())

Асинхронный API клиент

# Асинхронный API клиент
import asyncio
import aiohttp
import json
from typing import Dict, List, Optional

class AsyncAPIClient:
    """Асинхронный API клиент"""
    
    def __init__(self, base_url: str, api_key: Optional[str] = None):
        self.base_url = base_url
        self.api_key = api_key
        self.session = None
    
    async def __aenter__(self):
        """Асинхронный контекстный менеджер - вход"""
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Асинхронный контекстный менеджер - выход"""
        if self.session:
            await self.session.close()
    
    async def get(self, endpoint: str, params: Dict = None) -> Dict:
        """GET запрос"""
        url = f"{self.base_url}/{endpoint}"
        headers = {}
        
        if self.api_key:
            headers['Authorization'] = f'Bearer {self.api_key}'
        
        async with self.session.get(url, params=params, headers=headers) as response:
            if response.status == 200:
                return await response.json()
            else:
                raise Exception(f"API ошибка: {response.status}")
    
    async def post(self, endpoint: str, data: Dict) -> Dict:
        """POST запрос"""
        url = f"{self.base_url}/{endpoint}"
        headers = {'Content-Type': 'application/json'}
        
        if self.api_key:
            headers['Authorization'] = f'Bearer {self.api_key}'
        
        async with self.session.post(url, json=data, headers=headers) as response:
            if response.status in [200, 201]:
                return await response.json()
            else:
                raise Exception(f"API ошибка: {response.status}")
    
    async def batch_requests(self, requests: List[Dict]) -> List[Dict]:
        """Пакетные запросы"""
        tasks = []
        for req in requests:
            if req['method'] == 'GET':
                task = self.get(req['endpoint'], req.get('params'))
            elif req['method'] == 'POST':
                task = self.post(req['endpoint'], req.get('data'))
            else:
                continue
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

async def api_client_example():
    """Пример использования API клиента"""
    async with AsyncAPIClient("https://api.github.com") as client:
        # Одиночные запросы
        user_data = await client.get("users/octocat")
        print(f"Пользователь: {user_data['login']}")
        
        # Пакетные запросы
        requests = [
            {"method": "GET", "endpoint": "users/octocat"},
            {"method": "GET", "endpoint": "users/defunkt"},
            {"method": "GET", "endpoint": "users/mojombo"}
        ]
        
        results = await client.batch_requests(requests)
        for result in results:
            if isinstance(result, Exception):
                print(f"Ошибка: {result}")
            else:
                print(f"Пользователь: {result['login']}")

# Запуск
asyncio.run(api_client_example())

Заключение

В этой статье мы рассмотрели асинхронное программирование в Python:
  • ✅ Основы асинхронности и asyncio
  • ✅ Конкурентность и параллелизм
  • ✅ Создание асинхронных ботов
  • ✅ Обработка ошибок и паттерны
  • ✅ Производительность и оптимизация
  • ✅ Практические примеры
Асинхронное программирование позволяет создавать высокопроизводительные и масштабируемые боты.

Полезные ссылки

365 просмотров
2 лайков
0 комментариев