Обработка ошибок в ботах

В этой статье мы рассмотрим различные стратегии обработки ошибок в ботах, включая логирование, мониторинг, восстановление и предотвращение сбоев.

Содержание

Основы обработки ошибок

Типы ошибок в ботах

# Классификация ошибок в ботах
import logging
from enum import Enum
from typing import Optional, Dict, Any

class ErrorType(Enum):
    """Типы ошибок в ботах"""
    NETWORK = "network"
    API_LIMIT = "api_limit"
    AUTHENTICATION = "authentication"
    VALIDATION = "validation"
    DATABASE = "database"
    EXTERNAL_SERVICE = "external_service"
    INTERNAL = "internal"

class BotError(Exception):
    """Базовый класс для ошибок бота"""
    
    def __init__(self, message: str, error_type: ErrorType, 
                 user_id: Optional[str] = None, 
                 context: Optional[Dict[str, Any]] = None):
        super().__init__(message)
        self.error_type = error_type
        self.user_id = user_id
        self.context = context or {}
        self.timestamp = time.time()

class NetworkError(BotError):
    """Ошибка сети"""
    def __init__(self, message: str, user_id: Optional[str] = None):
        super().__init__(message, ErrorType.NETWORK, user_id)

class APILimitError(BotError):
    """Ошибка лимита API"""
    def __init__(self, message: str, retry_after: int, user_id: Optional[str] = None):
        super().__init__(message, ErrorType.API_LIMIT, user_id)
        self.retry_after = retry_after

class ValidationError(BotError):
    """Ошибка валидации"""
    def __init__(self, message: str, field: str, user_id: Optional[str] = None):
        super().__init__(message, ErrorType.VALIDATION, user_id)
        self.field = field

# Примеры использования
def demonstrate_error_types():
    """Демонстрация различных типов ошибок"""
    try:
        # Сетевая ошибка
        raise NetworkError("Не удалось подключиться к серверу", "user123")
    except NetworkError as e:
        print(f"Сетевая ошибка: {e}, пользователь: {e.user_id}")
    
    try:
        # Ошибка лимита API
        raise APILimitError("Превышен лимит запросов", 60, "user456")
    except APILimitError as e:
        print(f"Лимит API: {e}, повторить через: {e.retry_after}с")
    
    try:
        # Ошибка валидации
        raise ValidationError("Неверный формат email", "email", "user789")
    except ValidationError as e:
        print(f"Валидация: {e}, поле: {e.field}")

Структурированная обработка ошибок

# Структурированная обработка ошибок
import time
from typing import Callable, Any, Optional
from functools import wraps

class ErrorHandler:
    """Обработчик ошибок для ботов"""
    
    def __init__(self, logger: logging.Logger):
        self.logger = logger
        self.error_counts = {}
        self.last_errors = {}
    
    def handle_error(self, error: Exception, user_id: Optional[str] = None, 
                    context: Optional[Dict[str, Any]] = None):
        """Обработка ошибки"""
        error_type = type(error).__name__
        timestamp = time.time()
        
        # Подсчет ошибок
        self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1
        self.last_errors[error_type] = timestamp
        
        # Логирование
        self.logger.error(
            f"Ошибка {error_type}: {str(error)}",
            extra={
                'user_id': user_id,
                'error_type': error_type,
                'context': context,
                'timestamp': timestamp
            }
        )
        
        # Дополнительная обработка в зависимости от типа
        if isinstance(error, APILimitError):
            return self._handle_api_limit_error(error, user_id)
        elif isinstance(error, NetworkError):
            return self._handle_network_error(error, user_id)
        elif isinstance(error, ValidationError):
            return self._handle_validation_error(error, user_id)
        else:
            return self._handle_generic_error(error, user_id)
    
    def _handle_api_limit_error(self, error: APILimitError, user_id: Optional[str]):
        """Обработка ошибки лимита API"""
        return {
            'message': 'Превышен лимит запросов. Попробуйте позже.',
            'retry_after': error.retry_after,
            'action': 'wait'
        }
    
    def _handle_network_error(self, error: NetworkError, user_id: Optional[str]):
        """Обработка сетевой ошибки"""
        return {
            'message': 'Проблемы с сетью. Попробуйте еще раз.',
            'action': 'retry'
        }
    
    def _handle_validation_error(self, error: ValidationError, user_id: Optional[str]):
        """Обработка ошибки валидации"""
        return {
            'message': f'Ошибка в поле {error.field}: {str(error)}',
            'action': 'fix_input'
        }
    
    def _handle_generic_error(self, error: Exception, user_id: Optional[str]):
        """Обработка общей ошибки"""
        return {
            'message': 'Произошла ошибка. Обратитесь к администратору.',
            'action': 'contact_support'
        }

# Декоратор для автоматической обработки ошибок
def error_handler_decorator(error_handler: ErrorHandler):
    """Декоратор для автоматической обработки ошибок"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        async def wrapper(args, *kwargs):
            try:
                return await func(args, *kwargs)
            except Exception as e:
                # Извлечение user_id из аргументов
                user_id = None
                if args and hasattr(args[0], 'from_user'):
                    user_id = args[0].from_user.id
                
                return error_handler.handle_error(e, user_id)
        return wrapper
    return decorator

Логирование и мониторинг

Настройка логирования

# Настройка логирования для ботов
import logging
import logging.handlers
import json
from datetime import datetime
from typing import Dict, Any

class BotLogger:
    """Логгер для ботов"""
    
    def __init__(self, name: str, log_file: str = "bot.log"):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # Форматтер
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        
        # Файловый хендлер с ротацией
        file_handler = logging.handlers.RotatingFileHandler(
            log_file, maxBytes=1010241024, backupCount=5
        )
        file_handler.setFormatter(formatter)
        
        # Консольный хендлер
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        
        # Добавление хендлеров
        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)
    
    def log_user_action(self, user_id: str, action: str, 
                       details: Optional[Dict[str, Any]] = None):
        """Логирование действий пользователя"""
        self.logger.info(
            f"User action: {action}",
            extra={
                'user_id': user_id,
                'action': action,
                'details': details or {},
                'timestamp': datetime.now().isoformat()
            }
        )
    
    def log_error(self, error: Exception, user_id: Optional[str] = None,
                 context: Optional[Dict[str, Any]] = None):
        """Логирование ошибок"""
        self.logger.error(
            f"Error: {str(error)}",
            extra={
                'user_id': user_id,
                'error_type': type(error).__name__,
                'context': context or {},
                'timestamp': datetime.now().isoformat()
            }
        )
    
    def log_performance(self, operation: str, duration: float,
                       user_id: Optional[str] = None):
        """Логирование производительности"""
        self.logger.info(
            f"Performance: {operation} took {duration:.3f}s",
            extra={
                'user_id': user_id,
                'operation': operation,
                'duration': duration,
                'timestamp': datetime.now().isoformat()
            }
        )

# Структурированное логирование
class StructuredLogger:
    """Структурированный логгер"""
    
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # JSON форматтер
        handler = logging.StreamHandler()
        handler.setFormatter(JsonFormatter())
        self.logger.addHandler(handler)
    
    def log(self, level: str, message: str, kwargs):
        """Структурированное логирование"""
        log_data = {
            'timestamp': datetime.now().isoformat(),
            'level': level,
            'message': message,
            kwargs
        }
        
        getattr(self.logger, level.lower())(json.dumps(log_data))

class JsonFormatter(logging.Formatter):
    """JSON форматтер для логов"""
    
    def format(self, record):
        log_data = {
            'timestamp': datetime.fromtimestamp(record.created).isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno
        }
        
        # Добавление дополнительных полей
        if hasattr(record, 'user_id'):
            log_data['user_id'] = record.user_id
        if hasattr(record, 'error_type'):
            log_data['error_type'] = record.error_type
        if hasattr(record, 'context'):
            log_data['context'] = record.context
        
        return json.dumps(log_data)

Мониторинг и метрики

# Мониторинг и метрики для ботов
import time
from collections import defaultdict, deque
from typing import Dict, List, Optional
from dataclasses import dataclass

@dataclass
class Metric:
    """Метрика"""
    name: str
    value: float
    timestamp: float
    tags: Dict[str, str]

class BotMetrics:
    """Метрики бота"""
    
    def __init__(self):
        self.metrics = defaultdict(list)
        self.counters = defaultdict(int)
        self.timers = defaultdict(list)
        self.gauges = defaultdict(float)
    
    def increment_counter(self, name: str, value: int = 1, tags: Dict[str, str] = None):
        """Увеличение счетчика"""
        self.counters[name] += value
        self.metrics[name].append(Metric(name, value, time.time(), tags or {}))
    
    def record_timer(self, name: str, duration: float, tags: Dict[str, str] = None):
        """Запись времени выполнения"""
        self.timers[name].append(duration)
        self.metrics[name].append(Metric(name, duration, time.time(), tags or {}))
    
    def set_gauge(self, name: str, value: float, tags: Dict[str, str] = None):
        """Установка значения"""
        self.gauges[name] = value
        self.metrics[name].append(Metric(name, value, time.time(), tags or {}))
    
    def get_stats(self, name: str) -> Dict[str, float]:
        """Получение статистики"""
        if name in self.timers and self.timers[name]:
            durations = self.timers[name]
            return {
                'count': len(durations),
                'min': min(durations),
                'max': max(durations),
                'avg': sum(durations) / len(durations),
                'p95': sorted(durations)[int(len(durations) * 0.95)]
            }
        return {}
    
    def get_recent_metrics(self, name: str, minutes: int = 5) -> List[Metric]:
        """Получение недавних метрик"""
        cutoff_time = time.time() - (minutes * 60)
        return [m for m in self.metrics[name] if m.timestamp >= cutoff_time]

# Декоратор для автоматического измерения производительности
def measure_performance(metrics: BotMetrics, operation_name: str):
    """Декоратор для измерения производительности"""
    def decorator(func):
        @wraps(func)
        async def wrapper(args, *kwargs):
            start_time = time.time()
            try:
                result = await func(args, *kwargs)
                duration = time.time() - start_time
                
                # Запись метрики успеха
                metrics.record_timer(f"{operation_name}.success", duration)
                metrics.increment_counter(f"{operation_name}.success_count")
                
                return result
            except Exception as e:
                duration = time.time() - start_time
                
                # Запись метрики ошибки
                metrics.record_timer(f"{operation_name}.error", duration)
                metrics.increment_counter(f"{operation_name}.error_count")
                
                raise e
        return wrapper
    return decorator

Стратегии восстановления

Retry механизмы

# Механизмы повторных попыток
import asyncio
import random
from typing import Callable, Any, Optional
from functools import wraps

class RetryStrategy:
    """Стратегия повторных попыток"""
    
    def __init__(self, max_retries: int = 3, base_delay: float = 1.0,
                 max_delay: float = 60.0, exponential_base: float = 2.0,
                 jitter: bool = True):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter
    
    def get_delay(self, attempt: int) -> float:
        """Получение задержки для попытки"""
        delay = self.base_delay  (self.exponential_base * attempt)
        delay = min(delay, self.max_delay)
        
        if self.jitter:
            # Добавление случайности для избежания thundering herd
            delay = (0.5 + random.random()  0.5)
        
        return delay

class RetryHandler:
    """Обработчик повторных попыток"""
    
    def __init__(self, strategy: RetryStrategy, 
                 retryable_exceptions: tuple = (Exception,)):
        self.strategy = strategy
        self.retryable_exceptions = retryable_exceptions
    
    async def execute_with_retry(self, func: Callable, args, *kwargs) -> Any:
        """Выполнение функции с повторными попытками"""
        last_exception = None
        
        for attempt in range(self.strategy.max_retries + 1):
            try:
                if asyncio.iscoroutinefunction(func):
                    return await func(args, *kwargs)
                else:
                    return func(args, *kwargs)
            except self.retryable_exceptions as e:
                last_exception = e
                
                if attempt < self.strategy.max_retries:
                    delay = self.strategy.get_delay(attempt)
                    print(f"Попытка {attempt + 1} не удалась: {e}. Повтор через {delay:.2f}с")
                    await asyncio.sleep(delay)
                else:
                    print(f"Все {self.strategy.max_retries + 1} попыток исчерпаны")
                    break
        
        raise last_exception

# Декоратор для автоматических повторных попыток
def retry(retry_strategy: RetryStrategy, 
          retryable_exceptions: tuple = (Exception,)):
    """Декоратор для повторных попыток"""
    def decorator(func):
        @wraps(func)
        async def wrapper(args, *kwargs):
            handler = RetryHandler(retry_strategy, retryable_exceptions)
            return await handler.execute_with_retry(func, args, *kwargs)
        return wrapper
    return decorator

# Пример использования
@retry(RetryStrategy(max_retries=3, base_delay=1.0))
async def unreliable_api_call():
    """Ненадежный API вызов"""
    import random
    if random.random() < 0.7:  # 70% вероятность ошибки
        raise Exception("API недоступен")
    return "Успешный ответ"

# Circuit Breaker для предотвращения каскадных сбоев
class CircuitBreaker:
    """Circuit Breaker для предотвращения каскадных сбоев"""
    
    def __init__(self, failure_threshold: int = 5, 
                 recovery_timeout: float = 60.0,
                 expected_exception: type = Exception):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
    
    async def call(self, func: Callable, args, *kwargs) -> Any:
        """Вызов функции через Circuit Breaker"""
        if self.state == "OPEN":
            if self._should_attempt_reset():
                self.state = "HALF_OPEN"
            else:
                raise Exception("Circuit Breaker is OPEN")
        
        try:
            if asyncio.iscoroutinefunction(func):
                result = await func(args, *kwargs)
            else:
                result = func(args, *kwargs)
            
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise e
    
    def _should_attempt_reset(self) -> bool:
        """Проверка возможности сброса"""
        return (time.time() - self.last_failure_time) >= self.recovery_timeout
    
    def _on_success(self):
        """Обработка успешного вызова"""
        self.failure_count = 0
        self.state = "CLOSED"
    
    def _on_failure(self):
        """Обработка неудачного вызова"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = "OPEN"

Предотвращение ошибок

Валидация входных данных

# Валидация входных данных
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass
import re

@dataclass
class ValidationResult:
    """Результат валидации"""
    is_valid: bool
    errors: List[str]
    warnings: List[str]

class InputValidator:
    """Валидатор входных данных"""
    
    def __init__(self):
        self.rules = {}
    
    def add_rule(self, field: str, rule_func: Callable, error_message: str):
        """Добавление правила валидации"""
        if field not in self.rules:
            self.rules[field] = []
        self.rules[field].append((rule_func, error_message))
    
    def validate(self, data: Dict[str, Any]) -> ValidationResult:
        """Валидация данных"""
        errors = []
        warnings = []
        
        for field, rules in self.rules.items():
            value = data.get(field)
            
            for rule_func, error_message in rules:
                try:
                    if not rule_func(value):
                        errors.append(f"{field}: {error_message}")
                except Exception as e:
                    errors.append(f"{field}: Ошибка валидации - {str(e)}")
        
        return ValidationResult(
            is_valid=len(errors) == 0,
            errors=errors,
            warnings=warnings
        )

# Предопределенные правила валидации
class ValidationRules:
    """Правила валидации"""
    
    @staticmethod
    def required(value: Any) -> bool:
        """Проверка обязательности"""
        return value is not None and value != ""
    
    @staticmethod
    def email(value: str) -> bool:
        """Проверка email"""
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return bool(re.match(pattern, value))
    
    @staticmethod
    def phone(value: str) -> bool:
        """Проверка телефона"""
        pattern = r'^\+?[1-9]\d{1,14}$'
        return bool(re.match(pattern, value))
    
    @staticmethod
    def min_length(min_len: int):
        """Минимальная длина"""
        def rule(value: str) -> bool:
            return len(value) >= min_len
        return rule
    
    @staticmethod
    def max_length(max_len: int):
        """Максимальная длина"""
        def rule(value: str) -> bool:
            return len(value) <= max_len
        return rule
    
    @staticmethod
    def numeric(value: str) -> bool:
        """Проверка числового значения"""
        try:
            float(value)
            return True
        except (ValueError, TypeError):
            return False
    
    @staticmethod
    def range(min_val: float, max_val: float):
        """Проверка диапазона"""
        def rule(value: Union[str, int, float]) -> bool:
            try:
                num_val = float(value)
                return min_val <= num_val <= max_val
            except (ValueError, TypeError):
                return False
        return rule

# Пример использования валидации
def setup_user_validation():
    """Настройка валидации пользователя"""
    validator = InputValidator()
    
    # Правила для email
    validator.add_rule("email", ValidationRules.required, "Email обязателен")
    validator.add_rule("email", ValidationRules.email, "Неверный формат email")
    
    # Правила для имени
    validator.add_rule("name", ValidationRules.required, "Имя обязательно")
    validator.add_rule("name", ValidationRules.min_length(2), "Имя должно содержать минимум 2 символа")
    validator.add_rule("name", ValidationRules.max_length(50), "Имя не должно превышать 50 символов")
    
    # Правила для возраста
    validator.add_rule("age", ValidationRules.numeric, "Возраст должен быть числом")
    validator.add_rule("age", ValidationRules.range(13, 120), "Возраст должен быть от 13 до 120 лет")
    
    return validator

# Тестирование валидации
def test_validation():
    """Тестирование валидации"""
    validator = setup_user_validation()
    
    # Валидные данные
    valid_data = {
        "email": "user@example.com",
        "name": "Иван Иванов",
        "age": "25"
    }
    result = validator.validate(valid_data)
    print(f"Валидные данные: {result.is_valid}, ошибки: {result.errors}")
    
    # Невалидные данные
    invalid_data = {
        "email": "invalid-email",
        "name": "A",
        "age": "150"
    }
    result = validator.validate(invalid_data)
    print(f"Невалидные данные: {result.is_valid}, ошибки: {result.errors}")

Обработка внешних API

Обработка ошибок API

# Обработка ошибок внешних API
import aiohttp
import asyncio
from typing import Dict, Any, Optional
from enum import Enum

class APIErrorType(Enum):
    """Типы ошибок API"""
    RATE_LIMIT = "rate_limit"
    AUTHENTICATION = "authentication"
    PERMISSION_DENIED = "permission_denied"
    NOT_FOUND = "not_found"
    SERVER_ERROR = "server_error"
    NETWORK_ERROR = "network_error"
    TIMEOUT = "timeout"

class APIError(Exception):
    """Ошибка API"""
    
    def __init__(self, message: str, error_type: APIErrorType,
                 status_code: Optional[int] = None,
                 retry_after: Optional[int] = None):
        super().__init__(message)
        self.error_type = error_type
        self.status_code = status_code
        self.retry_after = retry_after

class APIClient:
    """Клиент для работы с API"""
    
    def __init__(self, base_url: str, api_key: Optional[str] = None,
                 timeout: float = 30.0, max_retries: int = 3):
        self.base_url = base_url
        self.api_key = api_key
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.max_retries = max_retries
    
    async def request(self, method: str, endpoint: str, 
                     data: Optional[Dict] = None,
                     params: Optional[Dict] = None) -> Dict[str, Any]:
        """Выполнение запроса к API"""
        url = f"{self.base_url}/{endpoint}"
        headers = {"Content-Type": "application/json"}
        
        if self.api_key:
            headers["Authorization"] = f"Bearer {self.api_key}"
        
        async with aiohttp.ClientSession(timeout=self.timeout) as session:
            for attempt in range(self.max_retries):
                try:
                    async with session.request(
                        method, url, json=data, params=params, headers=headers
                    ) as response:
                        return await self._handle_response(response)
                
                except asyncio.TimeoutError:
                    if attempt < self.max_retries - 1:
                        await asyncio.sleep(2  attempt)  # Экспоненциальная задержка
                        continue
                    raise APIError("Таймаут запроса", APIErrorType.TIMEOUT)
                
                except aiohttp.ClientError as e:
                    if attempt < self.max_retries - 1:
                        await asyncio.sleep(2  attempt)
                        continue
                    raise APIError(f"Ошибка сети: {str(e)}", APIErrorType.NETWORK_ERROR)
    
    async def _handle_response(self, response: aiohttp.ClientResponse) -> Dict[str, Any]:
        """Обработка ответа API"""
        try:
            data = await response.json()
        except aiohttp.ContentTypeError:
            data = {"error": "Неверный формат ответа"}
        
        if response.status == 200:
            return data
        elif response.status == 401:
            raise APIError("Ошибка аутентификации", APIErrorType.AUTHENTICATION, 401)
        elif response.status == 403:
            raise APIError("Доступ запрещен", APIErrorType.PERMISSION_DENIED, 403)
        elif response.status == 404:
            raise APIError("Ресурс не найден", APIErrorType.NOT_FOUND, 404)
        elif response.status == 429:
            retry_after = int(response.headers.get("Retry-After", 60))
            raise APIError("Превышен лимит запросов", APIErrorType.RATE_LIMIT, 429, retry_after)
        elif response.status >= 500:
            raise APIError("Ошибка сервера", APIErrorType.SERVER_ERROR, response.status)
        else:
            raise APIError(f"Неизвестная ошибка: {response.status}", APIErrorType.SERVER_ERROR, response.status)

# Обработчик ошибок API для ботов
class APIErrorHandler:
    """Обработчик ошибок API"""
    
    def __init__(self, logger: logging.Logger):
        self.logger = logger
        self.rate_limit_delays = {}
    
    async def handle_api_error(self, error: APIError, user_id: Optional[str] = None) -> Dict[str, Any]:
        """Обработка ошибки API"""
        self.logger.error(f"API ошибка: {error.error_type.value} - {str(error)}")
        
        if error.error_type == APIErrorType.RATE_LIMIT:
            return await self._handle_rate_limit(error, user_id)
        elif error.error_type == APIErrorType.AUTHENTICATION:
            return await self._handle_authentication_error(error, user_id)
        elif error.error_type == APIErrorType.NETWORK_ERROR:
            return await self._handle_network_error(error, user_id)
        else:
            return await self._handle_generic_api_error(error, user_id)
    
    async def _handle_rate_limit(self, error: APIError, user_id: Optional[str]) -> Dict[str, Any]:
        """Обработка ошибки лимита запросов"""
        retry_after = error.retry_after or 60
        
        # Сохранение информации о задержке для пользователя
        if user_id:
            self.rate_limit_delays[user_id] = time.time() + retry_after
        
        return {
            "message": f"Превышен лимит запросов. Попробуйте через {retry_after} секунд.",
            "retry_after": retry_after,
            "action": "wait"
        }
    
    async def _handle_authentication_error(self, error: APIError, user_id: Optional[str]) -> Dict[str, Any]:
        """Обработка ошибки аутентификации"""
        return {
            "message": "Ошибка аутентификации. Обратитесь к администратору.",
            "action": "contact_support"
        }
    
    async def _handle_network_error(self, error: APIError, user_id: Optional[str]) -> Dict[str, Any]:
        """Обработка сетевой ошибки"""
        return {
            "message": "Проблемы с сетью. Попробуйте еще раз.",
            "action": "retry"
        }
    
    async def _handle_generic_api_error(self, error: APIError, user_id: Optional[str]) -> Dict[str, Any]:
        """Обработка общей ошибки API"""
        return {
            "message": "Временная ошибка сервиса. Попробуйте позже.",
            "action": "retry_later"
        }

Тестирование обработки ошибок

Unit тесты для обработки ошибок

# Тесты для обработки ошибок
import pytest
import asyncio
from unittest.mock import Mock, patch
import time

class TestErrorHandling:
    """Тесты обработки ошибок"""
    
    def test_network_error_handling(self):
        """Тест обработки сетевой ошибки"""
        error_handler = ErrorHandler(logging.getLogger())
        error = NetworkError("Сеть недоступна", "user123")
        
        result = error_handler.handle_error(error, "user123")
        
        assert result["action"] == "retry"
        assert "сеть" in result["message"].lower()
    
    def test_api_limit_error_handling(self):
        """Тест обработки ошибки лимита API"""
        error_handler = ErrorHandler(logging.getLogger())
        error = APILimitError("Лимит превышен", 60, "user456")
        
        result = error_handler.handle_error(error, "user456")
        
        assert result["action"] == "wait"
        assert result["retry_after"] == 60
    
    def test_validation_error_handling(self):
        """Тест обработки ошибки валидации"""
        error_handler = ErrorHandler(logging.getLogger())
        error = ValidationError("Неверный email", "email", "user789")
        
        result = error_handler.handle_error(error, "user789")
        
        assert result["action"] == "fix_input"
        assert "email" in result["message"]
    
    @pytest.mark.asyncio
    async def test_retry_mechanism(self):
        """Тест механизма повторных попыток"""
        call_count = 0
        
        async def failing_function():
            nonlocal call_count
            call_count += 1
            if call_count < 3:
                raise Exception("Временная ошибка")
            return "Успех"
        
        strategy = RetryStrategy(max_retries=3, base_delay=0.1)
        handler = RetryHandler(strategy)
        
        result = await handler.execute_with_retry(failing_function)
        
        assert result == "Успех"
        assert call_count == 3
    
    @pytest.mark.asyncio
    async def test_circuit_breaker(self):
        """Тест Circuit Breaker"""
        circuit_breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=0.1)
        
        async def failing_function():
            raise Exception("Ошибка")
        
        # Первые два вызова должны пройти
        for _ in range(2):
            with pytest.raises(Exception):
                await circuit_breaker.call(failing_function)
        
        # Третий вызов должен быть заблокирован
        with pytest.raises(Exception, match="Circuit Breaker is OPEN"):
            await circuit_breaker.call(failing_function)
        
        # После таймаута должен перейти в HALF_OPEN
        await asyncio.sleep(0.2)
        with pytest.raises(Exception):
            await circuit_breaker.call(failing_function)
    
    def test_input_validation(self):
        """Тест валидации входных данных"""
        validator = InputValidator()
        validator.add_rule("email", ValidationRules.email, "Неверный email")
        validator.add_rule("name", ValidationRules.required, "Имя обязательно")
        
        # Валидные данные
        valid_data = {"email": "test@example.com", "name": "Иван"}
        result = validator.validate(valid_data)
        assert result.is_valid
        
        # Невалидные данные
        invalid_data = {"email": "invalid-email", "name": ""}
        result = validator.validate(invalid_data)
        assert not result.is_valid
        assert len(result.errors) == 2

# Интеграционные тесты
class TestIntegration:
    """Интеграционные тесты"""
    
    @pytest.mark.asyncio
    async def test_api_client_error_handling(self):
        """Тест обработки ошибок API клиента"""
        with patch('aiohttp.ClientSession') as mock_session:
            # Настройка мока для ошибки 429
            mock_response = Mock()
            mock_response.status = 429
            mock_response.headers = {"Retry-After": "60"}
            mock_response.json.return_value = {"error": "Rate limit exceeded"}
            
            mock_session.return_value.__aenter__.return_value.request.return_value.__aenter__.return_value = mock_response
            
            client = APIClient("https://api.example.com")
            
            with pytest.raises(APIError) as exc_info:
                await client.request("GET", "test")
            
            assert exc_info.value.error_type == APIErrorType.RATE_LIMIT
            assert exc_info.value.retry_after == 60

Мониторинг в продакшене

Система мониторинга

# Система мониторинга для продакшена
import asyncio
import json
import time
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta

@dataclass
class Alert:
    """Алерт"""
    id: str
    type: str
    severity: str
    message: str
    timestamp: float
    metadata: Dict[str, Any]

class MonitoringSystem:
    """Система мониторинга"""
    
    def __init__(self):
        self.metrics = BotMetrics()
        self.alerts = []
        self.alert_rules = {}
        self.health_checks = {}
    
    def add_alert_rule(self, name: str, condition_func: Callable, 
                      severity: str = "warning"):
        """Добавление правила алерта"""
        self.alert_rules[name] = {
            "condition": condition_func,
            "severity": severity
        }
    
    def add_health_check(self, name: str, check_func: Callable):
        """Добавление проверки здоровья"""
        self.health_checks[name] = check_func
    
    async def check_alerts(self):
        """Проверка алертов"""
        for name, rule in self.alert_rules.items():
            try:
                if rule["condition"]():
                    alert = Alert(
                        id=f"{name}_{int(time.time())}",
                        type=name,
                        severity=rule["severity"],
                        message=f"Сработал алерт: {name}",
                        timestamp=time.time(),
                        metadata={}
                    )
                    self.alerts.append(alert)
                    await self._send_alert(alert)
            except Exception as e:
                print(f"Ошибка проверки алерта {name}: {e}")
    
    async def run_health_checks(self) -> Dict[str, bool]:
        """Запуск проверок здоровья"""
        results = {}
        for name, check_func in self.health_checks.items():
            try:
                if asyncio.iscoroutinefunction(check_func):
                    result = await check_func()
                else:
                    result = check_func()
                results[name] = result
            except Exception as e:
                print(f"Ошибка проверки здоровья {name}: {e}")
                results[name] = False
        return results
    
    async def _send_alert(self, alert: Alert):
        """Отправка алерта"""
        # Здесь можно интегрировать с внешними системами мониторинга
        print(f"🚨 АЛЕРТ [{alert.severity.upper()}]: {alert.message}")
        
        # Отправка в Slack, Telegram, email и т.д.
        # await self._send_to_slack(alert)
        # await self._send_to_telegram(alert)
    
    def get_system_status(self) -> Dict[str, Any]:
        """Получение статуса системы"""
        return {
            "timestamp": datetime.now().isoformat(),
            "metrics": {
                "total_requests": sum(self.metrics.counters.values()),
                "error_rate": self._calculate_error_rate(),
                "avg_response_time": self._calculate_avg_response_time()
            },
            "alerts": [asdict(alert) for alert in self.alerts[-10:]],  # Последние 10 алертов
            "health_checks": asyncio.run(self.run_health_checks())
        }
    
    def _calculate_error_rate(self) -> float:
        """Расчет процента ошибок"""
        total_requests = sum(self.metrics.counters.values())
        error_requests = sum(
            count for name, count in self.metrics.counters.items() 
            if "error" in name.lower()
        )
        return (error_requests / total_requests * 100) if total_requests > 0 else 0
    
    def _calculate_avg_response_time(self) -> float:
        """Расчет среднего времени ответа"""
        all_times = []
        for times in self.metrics.timers.values():
            all_times.extend(times)
        return sum(all_times) / len(all_times) if all_times else 0

# Примеры правил мониторинга
def setup_monitoring_rules(monitoring: MonitoringSystem):
    """Настройка правил мониторинга"""
    
    # Правило для высокой частоты ошибок
    def high_error_rate():
        error_rate = monitoring._calculate_error_rate()
        return error_rate > 10  # Более 10% ошибок
    
    monitoring.add_alert_rule("high_error_rate", high_error_rate, "critical")
    
    # Правило для медленного ответа
    def slow_response():
        avg_time = monitoring._calculate_avg_response_time()
        return avg_time > 5.0  # Более 5 секунд
    
    monitoring.add_alert_rule("slow_response", slow_response, "warning")
    
    # Проверка здоровья базы данных
    async def database_health():
        try:
            # Здесь должна быть проверка подключения к БД
            return True
        except Exception:
            return False
    
    monitoring.add_health_check("database", database_health)
    
    # Проверка внешнего API
    async def external_api_health():
        try:
            # Проверка доступности внешнего API
            return True
        except Exception:
            return False
    
    monitoring.add_health_check("external_api", external_api_health)

# Запуск мониторинга
async def run_monitoring():
    """Запуск системы мониторинга"""
    monitoring = MonitoringSystem()
    setup_monitoring_rules(monitoring)
    
    while True:
        try:
            await monitoring.check_alerts()
            await asyncio.sleep(60)  # Проверка каждую минуту
        except Exception as e:
            print(f"Ошибка мониторинга: {e}")
            await asyncio.sleep(60)

Заключение

В этой статье мы рассмотрели комплексный подход к обработке ошибок в ботах:
  • ✅ Классификация и структурирование ошибок
  • ✅ Логирование и мониторинг
  • ✅ Стратегии восстановления и повторных попыток
  • ✅ Предотвращение ошибок через валидацию
  • ✅ Обработка ошибок внешних API
  • ✅ Тестирование обработки ошибок
  • ✅ Мониторинг в продакшене
Правильная обработка ошибок критически важна для стабильной работы ботов в продакшене.

Полезные ссылки

771 просмотров
50 лайков
0 комментариев