Логирование и мониторинг Python приложений

В этой статье мы рассмотрим эффективные стратегии логирования и мониторинга Python приложений, включая структурированное логирование, метрики, алерты и интеграцию с внешними системами мониторинга.

Содержание

Основы логирования

Настройка логирования

# Настройка логирования для Python приложений
import logging
import logging.handlers
import json
from datetime import datetime
from typing import Dict, Any, Optional
import os

class LoggingConfig:
    """Конфигурация логирования"""
    
    def __init__(self, app_name: str, log_level: str = "INFO"):
        self.app_name = app_name
        self.log_level = getattr(logging, log_level.upper())
        self.setup_logging()
    
    def setup_logging(self):
        """Настройка системы логирования"""
        # Создание логгера
        logger = logging.getLogger(self.app_name)
        logger.setLevel(self.log_level)
        
        # Очистка существующих хендлеров
        logger.handlers.clear()
        
        # Форматтер
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        
        # Консольный хендлер
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        logger.addHandler(console_handler)
        
        # Файловый хендлер с ротацией
        if not os.path.exists('logs'):
            os.makedirs('logs')
        
        file_handler = logging.handlers.RotatingFileHandler(
            f'logs/{self.app_name}.log',
            maxBytes=1010241024,  # 10MB
            backupCount=5
        )
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)
        
        # Хендлер для ошибок
        error_handler = logging.handlers.RotatingFileHandler(
            f'logs/{self.app_name}_errors.log',
            maxBytes=510241024,  # 5MB
            backupCount=3
        )
        error_handler.setLevel(logging.ERROR)
        error_handler.setFormatter(formatter)
        logger.addHandler(error_handler)
        
        return logger

# Использование
logger_config = LoggingConfig("my_bot", "INFO")
logger = logging.getLogger("my_bot")

# Примеры логирования
logger.info("Приложение запущено")
logger.warning("Предупреждение: низкая память")
logger.error("Ошибка подключения к базе данных")
logger.debug("Отладочная информация")

Контекстное логирование

# Контекстное логирование
import logging
from contextvars import ContextVar
from typing import Dict, Any

# Контекстные переменные
user_id_context: ContextVar[str] = ContextVar('user_id', default=None)
request_id_context: ContextVar[str] = ContextVar('request_id', default=None)
session_id_context: ContextVar[str] = ContextVar('session_id', default=None)

class ContextualLogger:
    """Контекстный логгер"""
    
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
    
    def _get_context(self) -> Dict[str, Any]:
        """Получение контекста"""
        context = {}
        
        user_id = user_id_context.get()
        if user_id:
            context['user_id'] = user_id
        
        request_id = request_id_context.get()
        if request_id:
            context['request_id'] = request_id
        
        session_id = session_id_context.get()
        if session_id:
            context['session_id'] = session_id
        
        return context
    
    def info(self, message: str, kwargs):
        """Информационное сообщение с контекстом"""
        context = self._get_context()
        context.update(kwargs)
        
        self.logger.info(
            message,
            extra={'context': context}
        )
    
    def error(self, message: str, kwargs):
        """Сообщение об ошибке с контекстом"""
        context = self._get_context()
        context.update(kwargs)
        
        self.logger.error(
            message,
            extra={'context': context}
        )
    
    def warning(self, message: str, kwargs):
        """Предупреждение с контекстом"""
        context = self._get_context()
        context.update(kwargs)
        
        self.logger.warning(
            message,
            extra={'context': context}
        )

# Контекстный менеджер для установки контекста
class LoggingContext:
    """Контекстный менеджер для логирования"""
    
    def __init__(self, user_id: Optional[str] = None,
                 request_id: Optional[str] = None,
                 session_id: Optional[str] = None):
        self.user_id = user_id
        self.request_id = request_id
        self.session_id = session_id
        self.tokens = []
    
    def __enter__(self):
        """Вход в контекст"""
        if self.user_id:
            token = user_id_context.set(self.user_id)
            self.tokens.append(token)
        
        if self.request_id:
            token = request_id_context.set(self.request_id)
            self.tokens.append(token)
        
        if self.session_id:
            token = session_id_context.set(self.session_id)
            self.tokens.append(token)
        
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Выход из контекста"""
        for token in self.tokens:
            token.reset()

# Пример использования
contextual_logger = ContextualLogger("bot")

def process_user_request(user_id: str, request_data: Dict[str, Any]):
    """Обработка запроса пользователя с логированием"""
    with LoggingContext(user_id=user_id, request_id="req_123"):
        contextual_logger.info("Начало обработки запроса", data=request_data)
        
        try:
            # Обработка запроса
            result = {"status": "success", "data": "processed"}
            contextual_logger.info("Запрос успешно обработан", result=result)
            return result
        except Exception as e:
            contextual_logger.error("Ошибка обработки запроса", error=str(e))
            raise

Структурированное логирование

JSON логирование

# JSON логирование
import json
import logging
from datetime import datetime
from typing import Any, Dict

class JSONFormatter(logging.Formatter):
    """JSON форматтер для логов"""
    
    def format(self, record: logging.LogRecord) -> str:
        """Форматирование записи в JSON"""
        log_entry = {
            'timestamp': datetime.fromtimestamp(record.created).isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno
        }
        
        # Добавление контекста
        if hasattr(record, 'context'):
            log_entry['context'] = record.context
        
        # Добавление исключения
        if record.exc_info:
            log_entry['exception'] = self.formatException(record.exc_info)
        
        return json.dumps(log_entry, ensure_ascii=False)

class StructuredLogger:
    """Структурированный логгер"""
    
    def __init__(self, name: str, log_file: Optional[str] = None):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # Очистка существующих хендлеров
        self.logger.handlers.clear()
        
        # JSON форматтер
        json_formatter = JSONFormatter()
        
        # Консольный хендлер
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(json_formatter)
        self.logger.addHandler(console_handler)
        
        # Файловый хендлер
        if log_file:
            file_handler = logging.FileHandler(log_file)
            file_handler.setFormatter(json_formatter)
            self.logger.addHandler(file_handler)
    
    def log_event(self, event_type: str, message: str, kwargs):
        """Логирование события"""
        self.logger.info(
            message,
            extra={
                'context': {
                    'event_type': event_type,
                    kwargs
                }
            }
        )
    
    def log_user_action(self, user_id: str, action: str, kwargs):
        """Логирование действия пользователя"""
        self.logger.info(
            f"User action: {action}",
            extra={
                'context': {
                    'user_id': user_id,
                    'action': action,
                    kwargs
                }
            }
        )
    
    def log_performance(self, operation: str, duration: float, kwargs):
        """Логирование производительности"""
        self.logger.info(
            f"Performance: {operation}",
            extra={
                'context': {
                    'operation': operation,
                    'duration': duration,
                    'performance_metric': True,
                    kwargs
                }
            }
        )
    
    def log_error(self, error: Exception, context: Dict[str, Any] = None):
        """Логирование ошибки"""
        self.logger.error(
            f"Error: {str(error)}",
            extra={
                'context': context or {},
                'error_type': type(error).__name__
            },
            exc_info=True
        )

# Пример использования
structured_logger = StructuredLogger("bot", "logs/bot_structured.log")

# Логирование событий
structured_logger.log_event(
    "user_registration",
    "Новый пользователь зарегистрирован",
    user_id="12345",
    email="user@example.com"
)

# Логирование производительности
import time
start_time = time.time()
# ... выполнение операции ...
duration = time.time() - start_time
structured_logger.log_performance(
    "database_query",
    duration,
    query_type="SELECT",
    table="users"
)

Логирование с фильтрацией

# Логирование с фильтрацией
import logging
from typing import Set, Optional

class SensitiveDataFilter(logging.Filter):
    """Фильтр для скрытия чувствительных данных"""
    
    def __init__(self, sensitive_fields: Set[str]):
        self.sensitive_fields = sensitive_fields
    
    def filter(self, record: logging.LogRecord) -> bool:
        """Фильтрация чувствительных данных"""
        if hasattr(record, 'context'):
            context = record.context.copy()
            
            for field in self.sensitive_fields:
                if field in context:
                    context[field] = "REDACTED"
            
            record.context = context
        
        return True

class LogLevelFilter(logging.Filter):
    """Фильтр по уровню логирования"""
    
    def __init__(self, min_level: int):
        self.min_level = min_level
    
    def filter(self, record: logging.LogRecord) -> bool:
        """Фильтрация по уровню"""
        return record.levelno >= self.min_level

class ConditionalLogger:
    """Условный логгер"""
    
    def __init__(self, name: str, enable_debug: bool = False):
        self.logger = logging.getLogger(name)
        self.enable_debug = enable_debug
        
        # Настройка фильтров
        sensitive_filter = SensitiveDataFilter({
            'password', 'token', 'api_key', 'secret'
        })
        self.logger.addFilter(sensitive_filter)
        
        if not enable_debug:
            debug_filter = LogLevelFilter(logging.INFO)
            self.logger.addFilter(debug_filter)
    
    def debug(self, message: str, kwargs):
        """Отладочное сообщение"""
        if self.enable_debug:
            self.logger.debug(message, extra={'context': kwargs})
    
    def info(self, message: str, kwargs):
        """Информационное сообщение"""
        self.logger.info(message, extra={'context': kwargs})
    
    def warning(self, message: str, kwargs):
        """Предупреждение"""
        self.logger.warning(message, extra={'context': kwargs})
    
    def error(self, message: str, kwargs):
        """Ошибка"""
        self.logger.error(message, extra={'context': kwargs})

# Пример использования
conditional_logger = ConditionalLogger("bot", enable_debug=True)

# Логирование с чувствительными данными
conditional_logger.info(
    "Пользователь авторизован",
    user_id="12345",
    password="secret123",  # Будет скрыто
    api_key="sk-123456789"  # Будет скрыто
)

Метрики и производительность

Система метрик

# Система метрик для мониторинга
import time
import threading
from collections import defaultdict, deque
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class Metric:
    """Метрика"""
    name: str
    value: float
    timestamp: float
    tags: Dict[str, str]

class MetricsCollector:
    """Сборщик метрик"""
    
    def __init__(self):
        self.counters = defaultdict(int)
        self.gauges = defaultdict(float)
        self.histograms = defaultdict(list)
        self.timers = defaultdict(list)
        self.lock = threading.Lock()
    
    def increment_counter(self, name: str, value: int = 1, tags: Dict[str, str] = None):
        """Увеличение счетчика"""
        with self.lock:
            self.counters[name] += value
    
    def set_gauge(self, name: str, value: float, tags: Dict[str, str] = None):
        """Установка значения"""
        with self.lock:
            self.gauges[name] = value
    
    def record_histogram(self, name: str, value: float, tags: Dict[str, str] = None):
        """Запись в гистограмму"""
        with self.lock:
            self.histograms[name].append(value)
    
    def record_timer(self, name: str, duration: float, tags: Dict[str, str] = None):
        """Запись времени выполнения"""
        with self.lock:
            self.timers[name].append(duration)
    
    def get_counter(self, name: str) -> int:
        """Получение значения счетчика"""
        with self.lock:
            return self.counters[name]
    
    def get_gauge(self, name: str) -> float:
        """Получение значения gauge"""
        with self.lock:
            return self.gauges[name]
    
    def get_histogram_stats(self, name: str) -> Dict[str, float]:
        """Получение статистики гистограммы"""
        with self.lock:
            values = self.histograms[name]
            if not values:
                return {}
            
            return {
                'count': len(values),
                'min': min(values),
                'max': max(values),
                'avg': sum(values) / len(values),
                'p50': self._percentile(values, 50),
                'p95': self._percentile(values, 95),
                'p99': self._percentile(values, 99)
            }
    
    def get_timer_stats(self, name: str) -> Dict[str, float]:
        """Получение статистики таймера"""
        with self.lock:
            values = self.timers[name]
            if not values:
                return {}
            
            return {
                'count': len(values),
                'min': min(values),
                'max': max(values),
                'avg': sum(values) / len(values),
                'p50': self._percentile(values, 50),
                'p95': self._percentile(values, 95),
                'p99': self._percentile(values, 99)
            }
    
    def _percentile(self, values: List[float], percentile: int) -> float:
        """Вычисление перцентиля"""
        sorted_values = sorted(values)
        index = int(len(sorted_values) * percentile / 100)
        return sorted_values[min(index, len(sorted_values) - 1)]
    
    def get_all_metrics(self) -> Dict[str, Any]:
        """Получение всех метрик"""
        with self.lock:
            return {
                'counters': dict(self.counters),
                'gauges': dict(self.gauges),
                'histograms': {name: self.get_histogram_stats(name) 
                             for name in self.histograms},
                'timers': {name: self.get_timer_stats(name) 
                          for name in self.timers}
            }

# Декоратор для автоматического измерения производительности
def measure_performance(metrics: MetricsCollector, operation_name: str):
    """Декоратор для измерения производительности"""
    def decorator(func):
        def wrapper(args, *kwargs):
            start_time = time.time()
            try:
                result = func(args, *kwargs)
                duration = time.time() - start_time
                
                # Запись метрики успеха
                metrics.record_timer(f"{operation_name}.success", duration)
                metrics.increment_counter(f"{operation_name}.success_count")
                
                return result
            except Exception as e:
                duration = time.time() - start_time
                
                # Запись метрики ошибки
                metrics.record_timer(f"{operation_name}.error", duration)
                metrics.increment_counter(f"{operation_name}.error_count")
                
                raise e
        return wrapper
    return decorator

# Пример использования
metrics = MetricsCollector()

@measure_performance(metrics, "database_query")
def database_query(query: str):
    """Имитация запроса к базе данных"""
    time.sleep(0.1)  # Имитация работы
    return {"result": "data"}

# Использование метрик
metrics.increment_counter("requests_total")
metrics.set_gauge("active_connections", 42)
metrics.record_histogram("response_size", 1024)

# Выполнение операций
for i in range(10):
    try:
        database_query("SELECT * FROM users")
    except Exception as e:
        pass

# Получение статистики
stats = metrics.get_timer_stats("database_query")
print(f"Статистика запросов к БД: {stats}")

Мониторинг ресурсов

# Мониторинг ресурсов системы
import psutil
import threading
import time
from typing import Dict, Any

class SystemMonitor:
    """Мониторинг системных ресурсов"""
    
    def __init__(self, metrics: MetricsCollector):
        self.metrics = metrics
        self.monitoring = False
        self.monitor_thread = None
    
    def start_monitoring(self, interval: float = 60.0):
        """Запуск мониторинга"""
        if self.monitoring:
            return
        
        self.monitoring = True
        self.monitor_thread = threading.Thread(
            target=self._monitor_loop,
            args=(interval,),
            daemon=True
        )
        self.monitor_thread.start()
    
    def stop_monitoring(self):
        """Остановка мониторинга"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
    
    def _monitor_loop(self, interval: float):
        """Цикл мониторинга"""
        while self.monitoring:
            try:
                self._collect_system_metrics()
                time.sleep(interval)
            except Exception as e:
                print(f"Ошибка мониторинга: {e}")
                time.sleep(interval)
    
    def _collect_system_metrics(self):
        """Сбор системных метрик"""
        # CPU
        cpu_percent = psutil.cpu_percent(interval=1)
        self.metrics.set_gauge("system.cpu_percent", cpu_percent)
        
        # Память
        memory = psutil.virtual_memory()
        self.metrics.set_gauge("system.memory_percent", memory.percent)
        self.metrics.set_gauge("system.memory_available_mb", memory.available / 1024 / 1024)
        
        # Диск
        disk = psutil.disk_usage('/')
        self.metrics.set_gauge("system.disk_percent", disk.percent)
        self.metrics.set_gauge("system.disk_free_gb", disk.free / 1024 / 1024 / 1024)
        
        # Сетевые соединения
        connections = len(psutil.net_connections())
        self.metrics.set_gauge("system.network_connections", connections)
        
        # Процессы
        processes = len(psutil.pids())
        self.metrics.set_gauge("system.processes_count", processes)
    
    def get_system_info(self) -> Dict[str, Any]:
        """Получение информации о системе"""
        return {
            'cpu_percent': psutil.cpu_percent(),
            'memory': {
                'total': psutil.virtual_memory().total,
                'available': psutil.virtual_memory().available,
                'percent': psutil.virtual_memory().percent
            },
            'disk': {
                'total': psutil.disk_usage('/').total,
                'free': psutil.disk_usage('/').free,
                'percent': psutil.disk_usage('/').percent
            },
            'network_connections': len(psutil.net_connections()),
            'processes': len(psutil.pids())
        }

# Пример использования
metrics = MetricsCollector()
system_monitor = SystemMonitor(metrics)

# Запуск мониторинга
system_monitor.start_monitoring(interval=30.0)  # Каждые 30 секунд

# Получение информации о системе
system_info = system_monitor.get_system_info()
print(f"Информация о системе: {system_info}")

Мониторинг в реальном времени

Веб-интерфейс мониторинга

# Веб-интерфейс для мониторинга
from flask import Flask, jsonify, render_template_string
import json
from datetime import datetime

app = Flask(__name__)

# HTML шаблон для дашборда
DASHBOARD_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
    <title>Мониторинг бота</title>
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; }
        .metric-card { 
            border: 1px solid #ddd; 
            padding: 15px; 
            margin: 10px; 
            border-radius: 5px;
            display: inline-block;
            width: 200px;
        }
        .metric-value { font-size: 24px; font-weight: bold; color: #333; }
        .metric-label { color: #666; }
        .chart-container { width: 400px; height: 200px; margin: 20px; }
    </style>
</head>
<body>
    <h1>Мониторинг бота</h1>
    
    <div id="metrics">
        <div class="metric-card">
            <div class="metric-value" id="requests-total">0</div>
            <div class="metric-label">Всего запросов</div>
        </div>
        
        <div class="metric-card">
            <div class="metric-value" id="error-rate">0%</div>
            <div class="metric-label">Процент ошибок</div>
        </div>
        
        <div class="metric-card">
            <div class="metric-value" id="avg-response">0ms</div>
            <div class="metric-label">Среднее время ответа</div>
        </div>
        
        <div class="metric-card">
            <div class="metric-value" id="cpu-usage">0%</div>
            <div class="metric-label">Использование CPU</div>
        </div>
    </div>
    
    <div class="chart-container">
        <canvas id="responseTimeChart"></canvas>
    </div>
    
    <div class="chart-container">
        <canvas id="errorRateChart"></canvas>
    </div>
    
    <script>
        // Обновление метрик каждые 5 секунд
        setInterval(updateMetrics, 5000);
        
        function updateMetrics() {
            fetch('/api/metrics')
                .then(response => response.json())
                .then(data => {
                    document.getElementById('requests-total').textContent = data.counters['requests_total'] || 0;
                    document.getElementById('error-rate').textContent = 
                        ((data.counters['database_query.error_count'] || 0) / 
                         (data.counters['database_query.success_count'] || 1) * 100).toFixed(1) + '%';
                    document.getElementById('avg-response').textContent = 
                        (data.timers['database_query.success']?.avg * 1000 || 0).toFixed(0) + 'ms';
                    document.getElementById('cpu-usage').textContent = 
                        (data.gauges['system.cpu_percent'] || 0).toFixed(1) + '%';
                });
        }
        
        // Инициализация графиков
        const responseTimeCtx = document.getElementById('responseTimeChart').getContext('2d');
        const responseTimeChart = new Chart(responseTimeCtx, {
            type: 'line',
            data: {
                labels: [],
                datasets: [{
                    label: 'Время ответа (мс)',
                    data: [],
                    borderColor: 'rgb(75, 192, 192)',
                    tension: 0.1
                }]
            },
            options: {
                responsive: true,
                scales: {
                    y: {
                        beginAtZero: true
                    }
                }
            }
        });
        
        // Обновление графиков
        setInterval(() => {
            fetch('/api/metrics')
                .then(response => response.json())
                .then(data => {
                    const now = new Date().toLocaleTimeString();
                    const avgTime = data.timers['database_query.success']?.avg * 1000 || 0;
                    
                    responseTimeChart.data.labels.push(now);
                    responseTimeChart.data.datasets[0].data.push(avgTime);
                    
                    if (responseTimeChart.data.labels.length > 20) {
                        responseTimeChart.data.labels.shift();
                        responseTimeChart.data.datasets[0].data.shift();
                    }
                    
                    responseTimeChart.update();
                });
        }, 5000);
        
        // Первоначальная загрузка
        updateMetrics();
    </script>
</body>
</html>
"""

@app.route('/')
def dashboard():
    """Дашборд мониторинга"""
    return render_template_string(DASHBOARD_TEMPLATE)

@app.route('/api/metrics')
def get_metrics():
    """API для получения метрик"""
    return jsonify(metrics.get_all_metrics())

@app.route('/api/system')
def get_system_info():
    """API для получения информации о системе"""
    return jsonify(system_monitor.get_system_info())

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

Интеграция с внешними системами

Интеграция с Prometheus

# Интеграция с Prometheus
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

# Создание метрик Prometheus
REQUEST_COUNT = Counter('bot_requests_total', 'Total requests', ['method', 'endpoint'])
REQUEST_DURATION = Histogram('bot_request_duration_seconds', 'Request duration')
ACTIVE_CONNECTIONS = Gauge('bot_active_connections', 'Active connections')
ERROR_COUNT = Counter('bot_errors_total', 'Total errors', ['error_type'])

class PrometheusMetrics:
    """Метрики Prometheus"""
    
    def __init__(self, port: int = 8000):
        self.port = port
        start_http_server(port)
    
    def record_request(self, method: str, endpoint: str, duration: float):
        """Запись метрики запроса"""
        REQUEST_COUNT.labels(method=method, endpoint=endpoint).inc()
        REQUEST_DURATION.observe(duration)
    
    def set_active_connections(self, count: int):
        """Установка количества активных соединений"""
        ACTIVE_CONNECTIONS.set(count)
    
    def record_error(self, error_type: str):
        """Запись ошибки"""
        ERROR_COUNT.labels(error_type=error_type).inc()

# Пример использования
prometheus_metrics = PrometheusMetrics(port=8000)

# Запись метрики
prometheus_metrics.record_request('GET', '/api/users', 0.123)
prometheus_metrics.set_active_connections(42)
prometheus_metrics.record_error('database_error')

Интеграция с Grafana

# Интеграция с Grafana через InfluxDB
from influxdb import InfluxDBClient
import json
from datetime import datetime

class InfluxDBMetrics:
    """Метрики для InfluxDB"""
    
    def __init__(self, host: str = 'localhost', port: int = 8086,
                 database: str = 'bot_metrics'):
        self.client = InfluxDBClient(host=host, port=port)
        self.database = database
        self._ensure_database()
    
    def _ensure_database(self):
        """Создание базы данных если не существует"""
        databases = self.client.get_list_database()
        if not any(db['name'] == self.database for db in databases):
            self.client.create_database(self.database)
        
        self.client.switch_database(self.database)
    
    def write_metric(self, measurement: str, fields: Dict[str, Any],
                     tags: Dict[str, str] = None):
        """Запись метрики"""
        point = {
            'measurement': measurement,
            'time': datetime.utcnow(),
            'fields': fields,
            'tags': tags or {}
        }
        
        self.client.write_points([point])
    
    def write_request_metric(self, method: str, endpoint: str, 
                           duration: float, status_code: int):
        """Запись метрики запроса"""
        self.write_metric(
            'requests',
            {
                'duration': duration,
                'status_code': status_code
            },
            {
                'method': method,
                'endpoint': endpoint
            }
        )
    
    def write_system_metric(self, cpu_percent: float, memory_percent: float):
        """Запись системной метрики"""
        self.write_metric(
            'system',
            {
                'cpu_percent': cpu_percent,
                'memory_percent': memory_percent
            }
        )

# Пример использования
influx_metrics = InfluxDBMetrics()

# Запись метрик
influx_metrics.write_request_metric('GET', '/api/users', 0.123, 200)
influx_metrics.write_system_metric(45.2, 67.8)

Алерты и уведомления

Система алертов

# Система алертов
import smtplib
import requests
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum

class AlertSeverity(Enum):
    """Уровни серьезности алертов"""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class Alert:
    """Алерт"""
    id: str
    title: str
    message: str
    severity: AlertSeverity
    timestamp: datetime
    metadata: Dict[str, Any]

class AlertManager:
    """Менеджер алертов"""
    
    def __init__(self):
        self.alerts = []
        self.notification_channels = []
    
    def add_notification_channel(self, channel):
        """Добавление канала уведомлений"""
        self.notification_channels.append(channel)
    
    def create_alert(self, title: str, message: str, 
                    severity: AlertSeverity, metadata: Dict[str, Any] = None):
        """Создание алерта"""
        alert = Alert(
            id=f"alert_{int(time.time())}",
            title=title,
            message=message,
            severity=severity,
            timestamp=datetime.now(),
            metadata=metadata or {}
        )
        
        self.alerts.append(alert)
        self._send_notifications(alert)
        
        return alert
    
    def _send_notifications(self, alert: Alert):
        """Отправка уведомлений"""
        for channel in self.notification_channels:
            try:
                channel.send(alert)
            except Exception as e:
                print(f"Ошибка отправки уведомления: {e}")

class EmailNotificationChannel:
    """Канал уведомлений по email"""
    
    def __init__(self, smtp_server: str, smtp_port: int,
                 username: str, password: str, recipients: List[str]):
        self.smtp_server = smtp_server
        self.smtp_port = smtp_port
        self.username = username
        self.password = password
        self.recipients = recipients
    
    def send(self, alert: Alert):
        """Отправка email уведомления"""
        msg = MIMEMultipart()
        msg['From'] = self.username
        msg['To'] = ', '.join(self.recipients)
        msg['Subject'] = f"[{alert.severity.value.upper()}] {alert.title}"
        
        body = f"""
        Алерт: {alert.title}
        
        Сообщение: {alert.message}
        
        Серьезность: {alert.severity.value}
        Время: {alert.timestamp}
        
        Метаданные: {json.dumps(alert.metadata, indent=2)}
        """
        
        msg.attach(MIMEText(body, 'plain'))
        
        with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
            server.starttls()
            server.login(self.username, self.password)
            server.send_message(msg)

class SlackNotificationChannel:
    """Канал уведомлений в Slack"""
    
    def __init__(self, webhook_url: str, channel: str = "#alerts"):
        self.webhook_url = webhook_url
        self.channel = channel
    
    def send(self, alert: Alert):
        """Отправка уведомления в Slack"""
        color_map = {
            AlertSeverity.LOW: "good",
            AlertSeverity.MEDIUM: "warning",
            AlertSeverity.HIGH: "danger",
            AlertSeverity.CRITICAL: "danger"
        }
        
        payload = {
            "channel": self.channel,
            "attachments": [{
                "color": color_map[alert.severity],
                "title": alert.title,
                "text": alert.message,
                "fields": [
                    {"title": "Серьезность", "value": alert.severity.value, "short": True},
                    {"title": "Время", "value": alert.timestamp.isoformat(), "short": True}
                ],
                "footer": "Bot Monitoring System"
            }]
        }
        
        requests.post(self.webhook_url, json=payload)

# Пример использования
alert_manager = AlertManager()

# Добавление каналов уведомлений
email_channel = EmailNotificationChannel(
    smtp_server="smtp.gmail.com",
    smtp_port=587,
    username="bot@example.com",
    password="password",
    recipients=["admin@example.com"]
)

slack_channel = SlackNotificationChannel(
    webhook_url="https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
)

alert_manager.add_notification_channel(email_channel)
alert_manager.add_notification_channel(slack_channel)

# Создание алерта
alert_manager.create_alert(
    title="Высокая частота ошибок",
    message="Процент ошибок превысил 10%",
    severity=AlertSeverity.HIGH,
    metadata={"error_rate": 12.5, "time_window": "5min"}
)

Лучшие практики

Конфигурация логирования

# Лучшие практики логирования
import logging.config
import yaml

# Конфигурация через YAML
LOGGING_CONFIG = """
version: 1
disable_existing_loggers: false

formatters:
  standard:
    format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  json:
    format: '{"timestamp": "%(asctime)s", "logger": "%(name)s", "level": "%(levelname)s", "message": "%(message)s"}'

handlers:
  console:
    class: logging.StreamHandler
    level: INFO
    formatter: standard
    stream: ext://sys.stdout

  file:
    class: logging.handlers.RotatingFileHandler
    level: DEBUG
    formatter: json
    filename: logs/bot.log
    maxBytes: 10485760  # 10MB
    backupCount: 5

  error_file:
    class: logging.handlers.RotatingFileHandler
    level: ERROR
    formatter: json
    filename: logs/errors.log
    maxBytes: 5242880  # 5MB
    backupCount: 3

loggers:
  bot:
    level: DEBUG
    handlers: [console, file, error_file]
    propagate: false

  requests:
    level: WARNING
    handlers: [console]
    propagate: false

root:
  level: INFO
  handlers: [console]
"""

# Применение конфигурации
logging.config.dictConfig(yaml.safe_load(LOGGING_CONFIG))

# Создание логгера
logger = logging.getLogger('bot')

# Примеры использования
logger.debug("Отладочная информация")
logger.info("Информационное сообщение")
logger.warning("Предупреждение")
logger.error("Ошибка")
logger.critical("Критическая ошибка")

Мониторинг производительности

# Мониторинг производительности
import cProfile
import pstats
import io
from functools import wraps
import time

def profile_performance(func):
    """Декоратор для профилирования производительности"""
    @wraps(func)
    def wrapper(args, *kwargs):
        profiler = cProfile.Profile()
        profiler.enable()
        
        start_time = time.time()
        result = func(args, *kwargs)
        end_time = time.time()
        
        profiler.disable()
        
        # Сохранение результатов профилирования
        s = io.StringIO()
        ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
        ps.print_stats(10)  # Топ 10 функций
        
        logger.info(f"Профилирование {func.__name__}: {end_time - start_time:.3f}с")
        logger.debug(f"Детали профилирования:\n{s.getvalue()}")
        
        return result
    return wrapper

# Пример использования
@profile_performance
def expensive_operation():
    """Дорогая операция"""
    time.sleep(0.1)
    return "Результат"

# Выполнение
result = expensive_operation()

Заключение

В этой статье мы рассмотрели комплексный подход к логированию и мониторингу Python приложений:
  • ✅ Настройка и конфигурация логирования
  • ✅ Структурированное и контекстное логирование
  • ✅ Система метрик и мониторинг производительности
  • ✅ Веб-интерфейс для мониторинга
  • ✅ Интеграция с внешними системами (Prometheus, Grafana)
  • ✅ Система алертов и уведомлений
  • ✅ Лучшие практики
Эффективный мониторинг критически важен для поддержания стабильной работы ботов в продакшене.

Полезные ссылки

510 просмотров
33 лайков
0 комментариев