Обработка ошибок в ботах
В этой статье мы рассмотрим различные стратегии обработки ошибок в ботах, включая логирование, мониторинг, восстановление и предотвращение сбоев.Содержание
- Основы обработки ошибок
- Логирование и мониторинг
- Стратегии восстановления
- Предотвращение ошибок
- Обработка внешних API
- Тестирование обработки ошибок
- Мониторинг в продакшене
Основы обработки ошибок
Типы ошибок в ботах
# Классификация ошибок в ботах
import logging
from enum import Enum
from typing import Optional, Dict, Any
class ErrorType(Enum):
"""Типы ошибок в ботах"""
NETWORK = "network"
API_LIMIT = "api_limit"
AUTHENTICATION = "authentication"
VALIDATION = "validation"
DATABASE = "database"
EXTERNAL_SERVICE = "external_service"
INTERNAL = "internal"
class BotError(Exception):
"""Базовый класс для ошибок бота"""
def __init__(self, message: str, error_type: ErrorType,
user_id: Optional[str] = None,
context: Optional[Dict[str, Any]] = None):
super().__init__(message)
self.error_type = error_type
self.user_id = user_id
self.context = context or {}
self.timestamp = time.time()
class NetworkError(BotError):
"""Ошибка сети"""
def __init__(self, message: str, user_id: Optional[str] = None):
super().__init__(message, ErrorType.NETWORK, user_id)
class APILimitError(BotError):
"""Ошибка лимита API"""
def __init__(self, message: str, retry_after: int, user_id: Optional[str] = None):
super().__init__(message, ErrorType.API_LIMIT, user_id)
self.retry_after = retry_after
class ValidationError(BotError):
"""Ошибка валидации"""
def __init__(self, message: str, field: str, user_id: Optional[str] = None):
super().__init__(message, ErrorType.VALIDATION, user_id)
self.field = field
# Примеры использования
def demonstrate_error_types():
"""Демонстрация различных типов ошибок"""
try:
# Сетевая ошибка
raise NetworkError("Не удалось подключиться к серверу", "user123")
except NetworkError as e:
print(f"Сетевая ошибка: {e}, пользователь: {e.user_id}")
try:
# Ошибка лимита API
raise APILimitError("Превышен лимит запросов", 60, "user456")
except APILimitError as e:
print(f"Лимит API: {e}, повторить через: {e.retry_after}с")
try:
# Ошибка валидации
raise ValidationError("Неверный формат email", "email", "user789")
except ValidationError as e:
print(f"Валидация: {e}, поле: {e.field}")Структурированная обработка ошибок
# Структурированная обработка ошибок
import time
from typing import Callable, Any, Optional
from functools import wraps
class ErrorHandler:
"""Обработчик ошибок для ботов"""
def __init__(self, logger: logging.Logger):
self.logger = logger
self.error_counts = {}
self.last_errors = {}
def handle_error(self, error: Exception, user_id: Optional[str] = None,
context: Optional[Dict[str, Any]] = None):
"""Обработка ошибки"""
error_type = type(error).__name__
timestamp = time.time()
# Подсчет ошибок
self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1
self.last_errors[error_type] = timestamp
# Логирование
self.logger.error(
f"Ошибка {error_type}: {str(error)}",
extra={
'user_id': user_id,
'error_type': error_type,
'context': context,
'timestamp': timestamp
}
)
# Дополнительная обработка в зависимости от типа
if isinstance(error, APILimitError):
return self._handle_api_limit_error(error, user_id)
elif isinstance(error, NetworkError):
return self._handle_network_error(error, user_id)
elif isinstance(error, ValidationError):
return self._handle_validation_error(error, user_id)
else:
return self._handle_generic_error(error, user_id)
def _handle_api_limit_error(self, error: APILimitError, user_id: Optional[str]):
"""Обработка ошибки лимита API"""
return {
'message': 'Превышен лимит запросов. Попробуйте позже.',
'retry_after': error.retry_after,
'action': 'wait'
}
def _handle_network_error(self, error: NetworkError, user_id: Optional[str]):
"""Обработка сетевой ошибки"""
return {
'message': 'Проблемы с сетью. Попробуйте еще раз.',
'action': 'retry'
}
def _handle_validation_error(self, error: ValidationError, user_id: Optional[str]):
"""Обработка ошибки валидации"""
return {
'message': f'Ошибка в поле {error.field}: {str(error)}',
'action': 'fix_input'
}
def _handle_generic_error(self, error: Exception, user_id: Optional[str]):
"""Обработка общей ошибки"""
return {
'message': 'Произошла ошибка. Обратитесь к администратору.',
'action': 'contact_support'
}
# Декоратор для автоматической обработки ошибок
def error_handler_decorator(error_handler: ErrorHandler):
"""Декоратор для автоматической обработки ошибок"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(args, *kwargs):
try:
return await func(args, *kwargs)
except Exception as e:
# Извлечение user_id из аргументов
user_id = None
if args and hasattr(args[0], 'from_user'):
user_id = args[0].from_user.id
return error_handler.handle_error(e, user_id)
return wrapper
return decoratorЛогирование и мониторинг
Настройка логирования
# Настройка логирования для ботов
import logging
import logging.handlers
import json
from datetime import datetime
from typing import Dict, Any
class BotLogger:
"""Логгер для ботов"""
def __init__(self, name: str, log_file: str = "bot.log"):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# Форматтер
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Файловый хендлер с ротацией
file_handler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=1010241024, backupCount=5
)
file_handler.setFormatter(formatter)
# Консольный хендлер
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
# Добавление хендлеров
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def log_user_action(self, user_id: str, action: str,
details: Optional[Dict[str, Any]] = None):
"""Логирование действий пользователя"""
self.logger.info(
f"User action: {action}",
extra={
'user_id': user_id,
'action': action,
'details': details or {},
'timestamp': datetime.now().isoformat()
}
)
def log_error(self, error: Exception, user_id: Optional[str] = None,
context: Optional[Dict[str, Any]] = None):
"""Логирование ошибок"""
self.logger.error(
f"Error: {str(error)}",
extra={
'user_id': user_id,
'error_type': type(error).__name__,
'context': context or {},
'timestamp': datetime.now().isoformat()
}
)
def log_performance(self, operation: str, duration: float,
user_id: Optional[str] = None):
"""Логирование производительности"""
self.logger.info(
f"Performance: {operation} took {duration:.3f}s",
extra={
'user_id': user_id,
'operation': operation,
'duration': duration,
'timestamp': datetime.now().isoformat()
}
)
# Структурированное логирование
class StructuredLogger:
"""Структурированный логгер"""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# JSON форматтер
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
self.logger.addHandler(handler)
def log(self, level: str, message: str, kwargs):
"""Структурированное логирование"""
log_data = {
'timestamp': datetime.now().isoformat(),
'level': level,
'message': message,
kwargs
}
getattr(self.logger, level.lower())(json.dumps(log_data))
class JsonFormatter(logging.Formatter):
"""JSON форматтер для логов"""
def format(self, record):
log_data = {
'timestamp': datetime.fromtimestamp(record.created).isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
# Добавление дополнительных полей
if hasattr(record, 'user_id'):
log_data['user_id'] = record.user_id
if hasattr(record, 'error_type'):
log_data['error_type'] = record.error_type
if hasattr(record, 'context'):
log_data['context'] = record.context
return json.dumps(log_data)Мониторинг и метрики
# Мониторинг и метрики для ботов
import time
from collections import defaultdict, deque
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class Metric:
"""Метрика"""
name: str
value: float
timestamp: float
tags: Dict[str, str]
class BotMetrics:
"""Метрики бота"""
def __init__(self):
self.metrics = defaultdict(list)
self.counters = defaultdict(int)
self.timers = defaultdict(list)
self.gauges = defaultdict(float)
def increment_counter(self, name: str, value: int = 1, tags: Dict[str, str] = None):
"""Увеличение счетчика"""
self.counters[name] += value
self.metrics[name].append(Metric(name, value, time.time(), tags or {}))
def record_timer(self, name: str, duration: float, tags: Dict[str, str] = None):
"""Запись времени выполнения"""
self.timers[name].append(duration)
self.metrics[name].append(Metric(name, duration, time.time(), tags or {}))
def set_gauge(self, name: str, value: float, tags: Dict[str, str] = None):
"""Установка значения"""
self.gauges[name] = value
self.metrics[name].append(Metric(name, value, time.time(), tags or {}))
def get_stats(self, name: str) -> Dict[str, float]:
"""Получение статистики"""
if name in self.timers and self.timers[name]:
durations = self.timers[name]
return {
'count': len(durations),
'min': min(durations),
'max': max(durations),
'avg': sum(durations) / len(durations),
'p95': sorted(durations)[int(len(durations) * 0.95)]
}
return {}
def get_recent_metrics(self, name: str, minutes: int = 5) -> List[Metric]:
"""Получение недавних метрик"""
cutoff_time = time.time() - (minutes * 60)
return [m for m in self.metrics[name] if m.timestamp >= cutoff_time]
# Декоратор для автоматического измерения производительности
def measure_performance(metrics: BotMetrics, operation_name: str):
"""Декоратор для измерения производительности"""
def decorator(func):
@wraps(func)
async def wrapper(args, *kwargs):
start_time = time.time()
try:
result = await func(args, *kwargs)
duration = time.time() - start_time
# Запись метрики успеха
metrics.record_timer(f"{operation_name}.success", duration)
metrics.increment_counter(f"{operation_name}.success_count")
return result
except Exception as e:
duration = time.time() - start_time
# Запись метрики ошибки
metrics.record_timer(f"{operation_name}.error", duration)
metrics.increment_counter(f"{operation_name}.error_count")
raise e
return wrapper
return decoratorСтратегии восстановления
Retry механизмы
# Механизмы повторных попыток
import asyncio
import random
from typing import Callable, Any, Optional
from functools import wraps
class RetryStrategy:
"""Стратегия повторных попыток"""
def __init__(self, max_retries: int = 3, base_delay: float = 1.0,
max_delay: float = 60.0, exponential_base: float = 2.0,
jitter: bool = True):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
def get_delay(self, attempt: int) -> float:
"""Получение задержки для попытки"""
delay = self.base_delay (self.exponential_base * attempt)
delay = min(delay, self.max_delay)
if self.jitter:
# Добавление случайности для избежания thundering herd
delay = (0.5 + random.random() 0.5)
return delay
class RetryHandler:
"""Обработчик повторных попыток"""
def __init__(self, strategy: RetryStrategy,
retryable_exceptions: tuple = (Exception,)):
self.strategy = strategy
self.retryable_exceptions = retryable_exceptions
async def execute_with_retry(self, func: Callable, args, *kwargs) -> Any:
"""Выполнение функции с повторными попытками"""
last_exception = None
for attempt in range(self.strategy.max_retries + 1):
try:
if asyncio.iscoroutinefunction(func):
return await func(args, *kwargs)
else:
return func(args, *kwargs)
except self.retryable_exceptions as e:
last_exception = e
if attempt < self.strategy.max_retries:
delay = self.strategy.get_delay(attempt)
print(f"Попытка {attempt + 1} не удалась: {e}. Повтор через {delay:.2f}с")
await asyncio.sleep(delay)
else:
print(f"Все {self.strategy.max_retries + 1} попыток исчерпаны")
break
raise last_exception
# Декоратор для автоматических повторных попыток
def retry(retry_strategy: RetryStrategy,
retryable_exceptions: tuple = (Exception,)):
"""Декоратор для повторных попыток"""
def decorator(func):
@wraps(func)
async def wrapper(args, *kwargs):
handler = RetryHandler(retry_strategy, retryable_exceptions)
return await handler.execute_with_retry(func, args, *kwargs)
return wrapper
return decorator
# Пример использования
@retry(RetryStrategy(max_retries=3, base_delay=1.0))
async def unreliable_api_call():
"""Ненадежный API вызов"""
import random
if random.random() < 0.7: # 70% вероятность ошибки
raise Exception("API недоступен")
return "Успешный ответ"
# Circuit Breaker для предотвращения каскадных сбоев
class CircuitBreaker:
"""Circuit Breaker для предотвращения каскадных сбоев"""
def __init__(self, failure_threshold: int = 5,
recovery_timeout: float = 60.0,
expected_exception: type = Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
async def call(self, func: Callable, args, *kwargs) -> Any:
"""Вызов функции через Circuit Breaker"""
if self.state == "OPEN":
if self._should_attempt_reset():
self.state = "HALF_OPEN"
else:
raise Exception("Circuit Breaker is OPEN")
try:
if asyncio.iscoroutinefunction(func):
result = await func(args, *kwargs)
else:
result = func(args, *kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self) -> bool:
"""Проверка возможности сброса"""
return (time.time() - self.last_failure_time) >= self.recovery_timeout
def _on_success(self):
"""Обработка успешного вызова"""
self.failure_count = 0
self.state = "CLOSED"
def _on_failure(self):
"""Обработка неудачного вызова"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"Предотвращение ошибок
Валидация входных данных
# Валидация входных данных
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass
import re
@dataclass
class ValidationResult:
"""Результат валидации"""
is_valid: bool
errors: List[str]
warnings: List[str]
class InputValidator:
"""Валидатор входных данных"""
def __init__(self):
self.rules = {}
def add_rule(self, field: str, rule_func: Callable, error_message: str):
"""Добавление правила валидации"""
if field not in self.rules:
self.rules[field] = []
self.rules[field].append((rule_func, error_message))
def validate(self, data: Dict[str, Any]) -> ValidationResult:
"""Валидация данных"""
errors = []
warnings = []
for field, rules in self.rules.items():
value = data.get(field)
for rule_func, error_message in rules:
try:
if not rule_func(value):
errors.append(f"{field}: {error_message}")
except Exception as e:
errors.append(f"{field}: Ошибка валидации - {str(e)}")
return ValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings
)
# Предопределенные правила валидации
class ValidationRules:
"""Правила валидации"""
@staticmethod
def required(value: Any) -> bool:
"""Проверка обязательности"""
return value is not None and value != ""
@staticmethod
def email(value: str) -> bool:
"""Проверка email"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, value))
@staticmethod
def phone(value: str) -> bool:
"""Проверка телефона"""
pattern = r'^\+?[1-9]\d{1,14}$'
return bool(re.match(pattern, value))
@staticmethod
def min_length(min_len: int):
"""Минимальная длина"""
def rule(value: str) -> bool:
return len(value) >= min_len
return rule
@staticmethod
def max_length(max_len: int):
"""Максимальная длина"""
def rule(value: str) -> bool:
return len(value) <= max_len
return rule
@staticmethod
def numeric(value: str) -> bool:
"""Проверка числового значения"""
try:
float(value)
return True
except (ValueError, TypeError):
return False
@staticmethod
def range(min_val: float, max_val: float):
"""Проверка диапазона"""
def rule(value: Union[str, int, float]) -> bool:
try:
num_val = float(value)
return min_val <= num_val <= max_val
except (ValueError, TypeError):
return False
return rule
# Пример использования валидации
def setup_user_validation():
"""Настройка валидации пользователя"""
validator = InputValidator()
# Правила для email
validator.add_rule("email", ValidationRules.required, "Email обязателен")
validator.add_rule("email", ValidationRules.email, "Неверный формат email")
# Правила для имени
validator.add_rule("name", ValidationRules.required, "Имя обязательно")
validator.add_rule("name", ValidationRules.min_length(2), "Имя должно содержать минимум 2 символа")
validator.add_rule("name", ValidationRules.max_length(50), "Имя не должно превышать 50 символов")
# Правила для возраста
validator.add_rule("age", ValidationRules.numeric, "Возраст должен быть числом")
validator.add_rule("age", ValidationRules.range(13, 120), "Возраст должен быть от 13 до 120 лет")
return validator
# Тестирование валидации
def test_validation():
"""Тестирование валидации"""
validator = setup_user_validation()
# Валидные данные
valid_data = {
"email": "user@example.com",
"name": "Иван Иванов",
"age": "25"
}
result = validator.validate(valid_data)
print(f"Валидные данные: {result.is_valid}, ошибки: {result.errors}")
# Невалидные данные
invalid_data = {
"email": "invalid-email",
"name": "A",
"age": "150"
}
result = validator.validate(invalid_data)
print(f"Невалидные данные: {result.is_valid}, ошибки: {result.errors}")Обработка внешних API
Обработка ошибок API
# Обработка ошибок внешних API
import aiohttp
import asyncio
from typing import Dict, Any, Optional
from enum import Enum
class APIErrorType(Enum):
"""Типы ошибок API"""
RATE_LIMIT = "rate_limit"
AUTHENTICATION = "authentication"
PERMISSION_DENIED = "permission_denied"
NOT_FOUND = "not_found"
SERVER_ERROR = "server_error"
NETWORK_ERROR = "network_error"
TIMEOUT = "timeout"
class APIError(Exception):
"""Ошибка API"""
def __init__(self, message: str, error_type: APIErrorType,
status_code: Optional[int] = None,
retry_after: Optional[int] = None):
super().__init__(message)
self.error_type = error_type
self.status_code = status_code
self.retry_after = retry_after
class APIClient:
"""Клиент для работы с API"""
def __init__(self, base_url: str, api_key: Optional[str] = None,
timeout: float = 30.0, max_retries: int = 3):
self.base_url = base_url
self.api_key = api_key
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.max_retries = max_retries
async def request(self, method: str, endpoint: str,
data: Optional[Dict] = None,
params: Optional[Dict] = None) -> Dict[str, Any]:
"""Выполнение запроса к API"""
url = f"{self.base_url}/{endpoint}"
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
async with aiohttp.ClientSession(timeout=self.timeout) as session:
for attempt in range(self.max_retries):
try:
async with session.request(
method, url, json=data, params=params, headers=headers
) as response:
return await self._handle_response(response)
except asyncio.TimeoutError:
if attempt < self.max_retries - 1:
await asyncio.sleep(2 attempt) # Экспоненциальная задержка
continue
raise APIError("Таймаут запроса", APIErrorType.TIMEOUT)
except aiohttp.ClientError as e:
if attempt < self.max_retries - 1:
await asyncio.sleep(2 attempt)
continue
raise APIError(f"Ошибка сети: {str(e)}", APIErrorType.NETWORK_ERROR)
async def _handle_response(self, response: aiohttp.ClientResponse) -> Dict[str, Any]:
"""Обработка ответа API"""
try:
data = await response.json()
except aiohttp.ContentTypeError:
data = {"error": "Неверный формат ответа"}
if response.status == 200:
return data
elif response.status == 401:
raise APIError("Ошибка аутентификации", APIErrorType.AUTHENTICATION, 401)
elif response.status == 403:
raise APIError("Доступ запрещен", APIErrorType.PERMISSION_DENIED, 403)
elif response.status == 404:
raise APIError("Ресурс не найден", APIErrorType.NOT_FOUND, 404)
elif response.status == 429:
retry_after = int(response.headers.get("Retry-After", 60))
raise APIError("Превышен лимит запросов", APIErrorType.RATE_LIMIT, 429, retry_after)
elif response.status >= 500:
raise APIError("Ошибка сервера", APIErrorType.SERVER_ERROR, response.status)
else:
raise APIError(f"Неизвестная ошибка: {response.status}", APIErrorType.SERVER_ERROR, response.status)
# Обработчик ошибок API для ботов
class APIErrorHandler:
"""Обработчик ошибок API"""
def __init__(self, logger: logging.Logger):
self.logger = logger
self.rate_limit_delays = {}
async def handle_api_error(self, error: APIError, user_id: Optional[str] = None) -> Dict[str, Any]:
"""Обработка ошибки API"""
self.logger.error(f"API ошибка: {error.error_type.value} - {str(error)}")
if error.error_type == APIErrorType.RATE_LIMIT:
return await self._handle_rate_limit(error, user_id)
elif error.error_type == APIErrorType.AUTHENTICATION:
return await self._handle_authentication_error(error, user_id)
elif error.error_type == APIErrorType.NETWORK_ERROR:
return await self._handle_network_error(error, user_id)
else:
return await self._handle_generic_api_error(error, user_id)
async def _handle_rate_limit(self, error: APIError, user_id: Optional[str]) -> Dict[str, Any]:
"""Обработка ошибки лимита запросов"""
retry_after = error.retry_after or 60
# Сохранение информации о задержке для пользователя
if user_id:
self.rate_limit_delays[user_id] = time.time() + retry_after
return {
"message": f"Превышен лимит запросов. Попробуйте через {retry_after} секунд.",
"retry_after": retry_after,
"action": "wait"
}
async def _handle_authentication_error(self, error: APIError, user_id: Optional[str]) -> Dict[str, Any]:
"""Обработка ошибки аутентификации"""
return {
"message": "Ошибка аутентификации. Обратитесь к администратору.",
"action": "contact_support"
}
async def _handle_network_error(self, error: APIError, user_id: Optional[str]) -> Dict[str, Any]:
"""Обработка сетевой ошибки"""
return {
"message": "Проблемы с сетью. Попробуйте еще раз.",
"action": "retry"
}
async def _handle_generic_api_error(self, error: APIError, user_id: Optional[str]) -> Dict[str, Any]:
"""Обработка общей ошибки API"""
return {
"message": "Временная ошибка сервиса. Попробуйте позже.",
"action": "retry_later"
}Тестирование обработки ошибок
Unit тесты для обработки ошибок
# Тесты для обработки ошибок
import pytest
import asyncio
from unittest.mock import Mock, patch
import time
class TestErrorHandling:
"""Тесты обработки ошибок"""
def test_network_error_handling(self):
"""Тест обработки сетевой ошибки"""
error_handler = ErrorHandler(logging.getLogger())
error = NetworkError("Сеть недоступна", "user123")
result = error_handler.handle_error(error, "user123")
assert result["action"] == "retry"
assert "сеть" in result["message"].lower()
def test_api_limit_error_handling(self):
"""Тест обработки ошибки лимита API"""
error_handler = ErrorHandler(logging.getLogger())
error = APILimitError("Лимит превышен", 60, "user456")
result = error_handler.handle_error(error, "user456")
assert result["action"] == "wait"
assert result["retry_after"] == 60
def test_validation_error_handling(self):
"""Тест обработки ошибки валидации"""
error_handler = ErrorHandler(logging.getLogger())
error = ValidationError("Неверный email", "email", "user789")
result = error_handler.handle_error(error, "user789")
assert result["action"] == "fix_input"
assert "email" in result["message"]
@pytest.mark.asyncio
async def test_retry_mechanism(self):
"""Тест механизма повторных попыток"""
call_count = 0
async def failing_function():
nonlocal call_count
call_count += 1
if call_count < 3:
raise Exception("Временная ошибка")
return "Успех"
strategy = RetryStrategy(max_retries=3, base_delay=0.1)
handler = RetryHandler(strategy)
result = await handler.execute_with_retry(failing_function)
assert result == "Успех"
assert call_count == 3
@pytest.mark.asyncio
async def test_circuit_breaker(self):
"""Тест Circuit Breaker"""
circuit_breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=0.1)
async def failing_function():
raise Exception("Ошибка")
# Первые два вызова должны пройти
for _ in range(2):
with pytest.raises(Exception):
await circuit_breaker.call(failing_function)
# Третий вызов должен быть заблокирован
with pytest.raises(Exception, match="Circuit Breaker is OPEN"):
await circuit_breaker.call(failing_function)
# После таймаута должен перейти в HALF_OPEN
await asyncio.sleep(0.2)
with pytest.raises(Exception):
await circuit_breaker.call(failing_function)
def test_input_validation(self):
"""Тест валидации входных данных"""
validator = InputValidator()
validator.add_rule("email", ValidationRules.email, "Неверный email")
validator.add_rule("name", ValidationRules.required, "Имя обязательно")
# Валидные данные
valid_data = {"email": "test@example.com", "name": "Иван"}
result = validator.validate(valid_data)
assert result.is_valid
# Невалидные данные
invalid_data = {"email": "invalid-email", "name": ""}
result = validator.validate(invalid_data)
assert not result.is_valid
assert len(result.errors) == 2
# Интеграционные тесты
class TestIntegration:
"""Интеграционные тесты"""
@pytest.mark.asyncio
async def test_api_client_error_handling(self):
"""Тест обработки ошибок API клиента"""
with patch('aiohttp.ClientSession') as mock_session:
# Настройка мока для ошибки 429
mock_response = Mock()
mock_response.status = 429
mock_response.headers = {"Retry-After": "60"}
mock_response.json.return_value = {"error": "Rate limit exceeded"}
mock_session.return_value.__aenter__.return_value.request.return_value.__aenter__.return_value = mock_response
client = APIClient("https://api.example.com")
with pytest.raises(APIError) as exc_info:
await client.request("GET", "test")
assert exc_info.value.error_type == APIErrorType.RATE_LIMIT
assert exc_info.value.retry_after == 60Мониторинг в продакшене
Система мониторинга
# Система мониторинга для продакшена
import asyncio
import json
import time
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
@dataclass
class Alert:
"""Алерт"""
id: str
type: str
severity: str
message: str
timestamp: float
metadata: Dict[str, Any]
class MonitoringSystem:
"""Система мониторинга"""
def __init__(self):
self.metrics = BotMetrics()
self.alerts = []
self.alert_rules = {}
self.health_checks = {}
def add_alert_rule(self, name: str, condition_func: Callable,
severity: str = "warning"):
"""Добавление правила алерта"""
self.alert_rules[name] = {
"condition": condition_func,
"severity": severity
}
def add_health_check(self, name: str, check_func: Callable):
"""Добавление проверки здоровья"""
self.health_checks[name] = check_func
async def check_alerts(self):
"""Проверка алертов"""
for name, rule in self.alert_rules.items():
try:
if rule["condition"]():
alert = Alert(
id=f"{name}_{int(time.time())}",
type=name,
severity=rule["severity"],
message=f"Сработал алерт: {name}",
timestamp=time.time(),
metadata={}
)
self.alerts.append(alert)
await self._send_alert(alert)
except Exception as e:
print(f"Ошибка проверки алерта {name}: {e}")
async def run_health_checks(self) -> Dict[str, bool]:
"""Запуск проверок здоровья"""
results = {}
for name, check_func in self.health_checks.items():
try:
if asyncio.iscoroutinefunction(check_func):
result = await check_func()
else:
result = check_func()
results[name] = result
except Exception as e:
print(f"Ошибка проверки здоровья {name}: {e}")
results[name] = False
return results
async def _send_alert(self, alert: Alert):
"""Отправка алерта"""
# Здесь можно интегрировать с внешними системами мониторинга
print(f"🚨 АЛЕРТ [{alert.severity.upper()}]: {alert.message}")
# Отправка в Slack, Telegram, email и т.д.
# await self._send_to_slack(alert)
# await self._send_to_telegram(alert)
def get_system_status(self) -> Dict[str, Any]:
"""Получение статуса системы"""
return {
"timestamp": datetime.now().isoformat(),
"metrics": {
"total_requests": sum(self.metrics.counters.values()),
"error_rate": self._calculate_error_rate(),
"avg_response_time": self._calculate_avg_response_time()
},
"alerts": [asdict(alert) for alert in self.alerts[-10:]], # Последние 10 алертов
"health_checks": asyncio.run(self.run_health_checks())
}
def _calculate_error_rate(self) -> float:
"""Расчет процента ошибок"""
total_requests = sum(self.metrics.counters.values())
error_requests = sum(
count for name, count in self.metrics.counters.items()
if "error" in name.lower()
)
return (error_requests / total_requests * 100) if total_requests > 0 else 0
def _calculate_avg_response_time(self) -> float:
"""Расчет среднего времени ответа"""
all_times = []
for times in self.metrics.timers.values():
all_times.extend(times)
return sum(all_times) / len(all_times) if all_times else 0
# Примеры правил мониторинга
def setup_monitoring_rules(monitoring: MonitoringSystem):
"""Настройка правил мониторинга"""
# Правило для высокой частоты ошибок
def high_error_rate():
error_rate = monitoring._calculate_error_rate()
return error_rate > 10 # Более 10% ошибок
monitoring.add_alert_rule("high_error_rate", high_error_rate, "critical")
# Правило для медленного ответа
def slow_response():
avg_time = monitoring._calculate_avg_response_time()
return avg_time > 5.0 # Более 5 секунд
monitoring.add_alert_rule("slow_response", slow_response, "warning")
# Проверка здоровья базы данных
async def database_health():
try:
# Здесь должна быть проверка подключения к БД
return True
except Exception:
return False
monitoring.add_health_check("database", database_health)
# Проверка внешнего API
async def external_api_health():
try:
# Проверка доступности внешнего API
return True
except Exception:
return False
monitoring.add_health_check("external_api", external_api_health)
# Запуск мониторинга
async def run_monitoring():
"""Запуск системы мониторинга"""
monitoring = MonitoringSystem()
setup_monitoring_rules(monitoring)
while True:
try:
await monitoring.check_alerts()
await asyncio.sleep(60) # Проверка каждую минуту
except Exception as e:
print(f"Ошибка мониторинга: {e}")
await asyncio.sleep(60)Заключение
В этой статье мы рассмотрели комплексный подход к обработке ошибок в ботах:- ✅ Классификация и структурирование ошибок
- ✅ Логирование и мониторинг
- ✅ Стратегии восстановления и повторных попыток
- ✅ Предотвращение ошибок через валидацию
- ✅ Обработка ошибок внешних API
- ✅ Тестирование обработки ошибок
- ✅ Мониторинг в продакшене
Полезные ссылки
771 просмотров
50 лайков
0 комментариев
Комментарии (0)
Пока нет комментариев. Будьте первым!