Логирование и мониторинг Python приложений
В этой статье мы рассмотрим эффективные стратегии логирования и мониторинга Python приложений, включая структурированное логирование, метрики, алерты и интеграцию с внешними системами мониторинга.Содержание
- Основы логирования
- Структурированное логирование
- Метрики и производительность
- Мониторинг в реальном времени
- Интеграция с внешними системами
- Алерты и уведомления
- Лучшие практики
Основы логирования
Настройка логирования
# Настройка логирования для Python приложений
import logging
import logging.handlers
import json
from datetime import datetime
from typing import Dict, Any, Optional
import os
class LoggingConfig:
"""Конфигурация логирования"""
def __init__(self, app_name: str, log_level: str = "INFO"):
self.app_name = app_name
self.log_level = getattr(logging, log_level.upper())
self.setup_logging()
def setup_logging(self):
"""Настройка системы логирования"""
# Создание логгера
logger = logging.getLogger(self.app_name)
logger.setLevel(self.log_level)
# Очистка существующих хендлеров
logger.handlers.clear()
# Форматтер
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Консольный хендлер
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# Файловый хендлер с ротацией
if not os.path.exists('logs'):
os.makedirs('logs')
file_handler = logging.handlers.RotatingFileHandler(
f'logs/{self.app_name}.log',
maxBytes=1010241024, # 10MB
backupCount=5
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# Хендлер для ошибок
error_handler = logging.handlers.RotatingFileHandler(
f'logs/{self.app_name}_errors.log',
maxBytes=510241024, # 5MB
backupCount=3
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
logger.addHandler(error_handler)
return logger
# Использование
logger_config = LoggingConfig("my_bot", "INFO")
logger = logging.getLogger("my_bot")
# Примеры логирования
logger.info("Приложение запущено")
logger.warning("Предупреждение: низкая память")
logger.error("Ошибка подключения к базе данных")
logger.debug("Отладочная информация")Контекстное логирование
# Контекстное логирование
import logging
from contextvars import ContextVar
from typing import Dict, Any
# Контекстные переменные
user_id_context: ContextVar[str] = ContextVar('user_id', default=None)
request_id_context: ContextVar[str] = ContextVar('request_id', default=None)
session_id_context: ContextVar[str] = ContextVar('session_id', default=None)
class ContextualLogger:
"""Контекстный логгер"""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
def _get_context(self) -> Dict[str, Any]:
"""Получение контекста"""
context = {}
user_id = user_id_context.get()
if user_id:
context['user_id'] = user_id
request_id = request_id_context.get()
if request_id:
context['request_id'] = request_id
session_id = session_id_context.get()
if session_id:
context['session_id'] = session_id
return context
def info(self, message: str, kwargs):
"""Информационное сообщение с контекстом"""
context = self._get_context()
context.update(kwargs)
self.logger.info(
message,
extra={'context': context}
)
def error(self, message: str, kwargs):
"""Сообщение об ошибке с контекстом"""
context = self._get_context()
context.update(kwargs)
self.logger.error(
message,
extra={'context': context}
)
def warning(self, message: str, kwargs):
"""Предупреждение с контекстом"""
context = self._get_context()
context.update(kwargs)
self.logger.warning(
message,
extra={'context': context}
)
# Контекстный менеджер для установки контекста
class LoggingContext:
"""Контекстный менеджер для логирования"""
def __init__(self, user_id: Optional[str] = None,
request_id: Optional[str] = None,
session_id: Optional[str] = None):
self.user_id = user_id
self.request_id = request_id
self.session_id = session_id
self.tokens = []
def __enter__(self):
"""Вход в контекст"""
if self.user_id:
token = user_id_context.set(self.user_id)
self.tokens.append(token)
if self.request_id:
token = request_id_context.set(self.request_id)
self.tokens.append(token)
if self.session_id:
token = session_id_context.set(self.session_id)
self.tokens.append(token)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Выход из контекста"""
for token in self.tokens:
token.reset()
# Пример использования
contextual_logger = ContextualLogger("bot")
def process_user_request(user_id: str, request_data: Dict[str, Any]):
"""Обработка запроса пользователя с логированием"""
with LoggingContext(user_id=user_id, request_id="req_123"):
contextual_logger.info("Начало обработки запроса", data=request_data)
try:
# Обработка запроса
result = {"status": "success", "data": "processed"}
contextual_logger.info("Запрос успешно обработан", result=result)
return result
except Exception as e:
contextual_logger.error("Ошибка обработки запроса", error=str(e))
raiseСтруктурированное логирование
JSON логирование
# JSON логирование
import json
import logging
from datetime import datetime
from typing import Any, Dict
class JSONFormatter(logging.Formatter):
"""JSON форматтер для логов"""
def format(self, record: logging.LogRecord) -> str:
"""Форматирование записи в JSON"""
log_entry = {
'timestamp': datetime.fromtimestamp(record.created).isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
# Добавление контекста
if hasattr(record, 'context'):
log_entry['context'] = record.context
# Добавление исключения
if record.exc_info:
log_entry['exception'] = self.formatException(record.exc_info)
return json.dumps(log_entry, ensure_ascii=False)
class StructuredLogger:
"""Структурированный логгер"""
def __init__(self, name: str, log_file: Optional[str] = None):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# Очистка существующих хендлеров
self.logger.handlers.clear()
# JSON форматтер
json_formatter = JSONFormatter()
# Консольный хендлер
console_handler = logging.StreamHandler()
console_handler.setFormatter(json_formatter)
self.logger.addHandler(console_handler)
# Файловый хендлер
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(json_formatter)
self.logger.addHandler(file_handler)
def log_event(self, event_type: str, message: str, kwargs):
"""Логирование события"""
self.logger.info(
message,
extra={
'context': {
'event_type': event_type,
kwargs
}
}
)
def log_user_action(self, user_id: str, action: str, kwargs):
"""Логирование действия пользователя"""
self.logger.info(
f"User action: {action}",
extra={
'context': {
'user_id': user_id,
'action': action,
kwargs
}
}
)
def log_performance(self, operation: str, duration: float, kwargs):
"""Логирование производительности"""
self.logger.info(
f"Performance: {operation}",
extra={
'context': {
'operation': operation,
'duration': duration,
'performance_metric': True,
kwargs
}
}
)
def log_error(self, error: Exception, context: Dict[str, Any] = None):
"""Логирование ошибки"""
self.logger.error(
f"Error: {str(error)}",
extra={
'context': context or {},
'error_type': type(error).__name__
},
exc_info=True
)
# Пример использования
structured_logger = StructuredLogger("bot", "logs/bot_structured.log")
# Логирование событий
structured_logger.log_event(
"user_registration",
"Новый пользователь зарегистрирован",
user_id="12345",
email="user@example.com"
)
# Логирование производительности
import time
start_time = time.time()
# ... выполнение операции ...
duration = time.time() - start_time
structured_logger.log_performance(
"database_query",
duration,
query_type="SELECT",
table="users"
)Логирование с фильтрацией
# Логирование с фильтрацией
import logging
from typing import Set, Optional
class SensitiveDataFilter(logging.Filter):
"""Фильтр для скрытия чувствительных данных"""
def __init__(self, sensitive_fields: Set[str]):
self.sensitive_fields = sensitive_fields
def filter(self, record: logging.LogRecord) -> bool:
"""Фильтрация чувствительных данных"""
if hasattr(record, 'context'):
context = record.context.copy()
for field in self.sensitive_fields:
if field in context:
context[field] = "REDACTED"
record.context = context
return True
class LogLevelFilter(logging.Filter):
"""Фильтр по уровню логирования"""
def __init__(self, min_level: int):
self.min_level = min_level
def filter(self, record: logging.LogRecord) -> bool:
"""Фильтрация по уровню"""
return record.levelno >= self.min_level
class ConditionalLogger:
"""Условный логгер"""
def __init__(self, name: str, enable_debug: bool = False):
self.logger = logging.getLogger(name)
self.enable_debug = enable_debug
# Настройка фильтров
sensitive_filter = SensitiveDataFilter({
'password', 'token', 'api_key', 'secret'
})
self.logger.addFilter(sensitive_filter)
if not enable_debug:
debug_filter = LogLevelFilter(logging.INFO)
self.logger.addFilter(debug_filter)
def debug(self, message: str, kwargs):
"""Отладочное сообщение"""
if self.enable_debug:
self.logger.debug(message, extra={'context': kwargs})
def info(self, message: str, kwargs):
"""Информационное сообщение"""
self.logger.info(message, extra={'context': kwargs})
def warning(self, message: str, kwargs):
"""Предупреждение"""
self.logger.warning(message, extra={'context': kwargs})
def error(self, message: str, kwargs):
"""Ошибка"""
self.logger.error(message, extra={'context': kwargs})
# Пример использования
conditional_logger = ConditionalLogger("bot", enable_debug=True)
# Логирование с чувствительными данными
conditional_logger.info(
"Пользователь авторизован",
user_id="12345",
password="secret123", # Будет скрыто
api_key="sk-123456789" # Будет скрыто
)Метрики и производительность
Система метрик
# Система метрик для мониторинга
import time
import threading
from collections import defaultdict, deque
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class Metric:
"""Метрика"""
name: str
value: float
timestamp: float
tags: Dict[str, str]
class MetricsCollector:
"""Сборщик метрик"""
def __init__(self):
self.counters = defaultdict(int)
self.gauges = defaultdict(float)
self.histograms = defaultdict(list)
self.timers = defaultdict(list)
self.lock = threading.Lock()
def increment_counter(self, name: str, value: int = 1, tags: Dict[str, str] = None):
"""Увеличение счетчика"""
with self.lock:
self.counters[name] += value
def set_gauge(self, name: str, value: float, tags: Dict[str, str] = None):
"""Установка значения"""
with self.lock:
self.gauges[name] = value
def record_histogram(self, name: str, value: float, tags: Dict[str, str] = None):
"""Запись в гистограмму"""
with self.lock:
self.histograms[name].append(value)
def record_timer(self, name: str, duration: float, tags: Dict[str, str] = None):
"""Запись времени выполнения"""
with self.lock:
self.timers[name].append(duration)
def get_counter(self, name: str) -> int:
"""Получение значения счетчика"""
with self.lock:
return self.counters[name]
def get_gauge(self, name: str) -> float:
"""Получение значения gauge"""
with self.lock:
return self.gauges[name]
def get_histogram_stats(self, name: str) -> Dict[str, float]:
"""Получение статистики гистограммы"""
with self.lock:
values = self.histograms[name]
if not values:
return {}
return {
'count': len(values),
'min': min(values),
'max': max(values),
'avg': sum(values) / len(values),
'p50': self._percentile(values, 50),
'p95': self._percentile(values, 95),
'p99': self._percentile(values, 99)
}
def get_timer_stats(self, name: str) -> Dict[str, float]:
"""Получение статистики таймера"""
with self.lock:
values = self.timers[name]
if not values:
return {}
return {
'count': len(values),
'min': min(values),
'max': max(values),
'avg': sum(values) / len(values),
'p50': self._percentile(values, 50),
'p95': self._percentile(values, 95),
'p99': self._percentile(values, 99)
}
def _percentile(self, values: List[float], percentile: int) -> float:
"""Вычисление перцентиля"""
sorted_values = sorted(values)
index = int(len(sorted_values) * percentile / 100)
return sorted_values[min(index, len(sorted_values) - 1)]
def get_all_metrics(self) -> Dict[str, Any]:
"""Получение всех метрик"""
with self.lock:
return {
'counters': dict(self.counters),
'gauges': dict(self.gauges),
'histograms': {name: self.get_histogram_stats(name)
for name in self.histograms},
'timers': {name: self.get_timer_stats(name)
for name in self.timers}
}
# Декоратор для автоматического измерения производительности
def measure_performance(metrics: MetricsCollector, operation_name: str):
"""Декоратор для измерения производительности"""
def decorator(func):
def wrapper(args, *kwargs):
start_time = time.time()
try:
result = func(args, *kwargs)
duration = time.time() - start_time
# Запись метрики успеха
metrics.record_timer(f"{operation_name}.success", duration)
metrics.increment_counter(f"{operation_name}.success_count")
return result
except Exception as e:
duration = time.time() - start_time
# Запись метрики ошибки
metrics.record_timer(f"{operation_name}.error", duration)
metrics.increment_counter(f"{operation_name}.error_count")
raise e
return wrapper
return decorator
# Пример использования
metrics = MetricsCollector()
@measure_performance(metrics, "database_query")
def database_query(query: str):
"""Имитация запроса к базе данных"""
time.sleep(0.1) # Имитация работы
return {"result": "data"}
# Использование метрик
metrics.increment_counter("requests_total")
metrics.set_gauge("active_connections", 42)
metrics.record_histogram("response_size", 1024)
# Выполнение операций
for i in range(10):
try:
database_query("SELECT * FROM users")
except Exception as e:
pass
# Получение статистики
stats = metrics.get_timer_stats("database_query")
print(f"Статистика запросов к БД: {stats}")Мониторинг ресурсов
# Мониторинг ресурсов системы
import psutil
import threading
import time
from typing import Dict, Any
class SystemMonitor:
"""Мониторинг системных ресурсов"""
def __init__(self, metrics: MetricsCollector):
self.metrics = metrics
self.monitoring = False
self.monitor_thread = None
def start_monitoring(self, interval: float = 60.0):
"""Запуск мониторинга"""
if self.monitoring:
return
self.monitoring = True
self.monitor_thread = threading.Thread(
target=self._monitor_loop,
args=(interval,),
daemon=True
)
self.monitor_thread.start()
def stop_monitoring(self):
"""Остановка мониторинга"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
def _monitor_loop(self, interval: float):
"""Цикл мониторинга"""
while self.monitoring:
try:
self._collect_system_metrics()
time.sleep(interval)
except Exception as e:
print(f"Ошибка мониторинга: {e}")
time.sleep(interval)
def _collect_system_metrics(self):
"""Сбор системных метрик"""
# CPU
cpu_percent = psutil.cpu_percent(interval=1)
self.metrics.set_gauge("system.cpu_percent", cpu_percent)
# Память
memory = psutil.virtual_memory()
self.metrics.set_gauge("system.memory_percent", memory.percent)
self.metrics.set_gauge("system.memory_available_mb", memory.available / 1024 / 1024)
# Диск
disk = psutil.disk_usage('/')
self.metrics.set_gauge("system.disk_percent", disk.percent)
self.metrics.set_gauge("system.disk_free_gb", disk.free / 1024 / 1024 / 1024)
# Сетевые соединения
connections = len(psutil.net_connections())
self.metrics.set_gauge("system.network_connections", connections)
# Процессы
processes = len(psutil.pids())
self.metrics.set_gauge("system.processes_count", processes)
def get_system_info(self) -> Dict[str, Any]:
"""Получение информации о системе"""
return {
'cpu_percent': psutil.cpu_percent(),
'memory': {
'total': psutil.virtual_memory().total,
'available': psutil.virtual_memory().available,
'percent': psutil.virtual_memory().percent
},
'disk': {
'total': psutil.disk_usage('/').total,
'free': psutil.disk_usage('/').free,
'percent': psutil.disk_usage('/').percent
},
'network_connections': len(psutil.net_connections()),
'processes': len(psutil.pids())
}
# Пример использования
metrics = MetricsCollector()
system_monitor = SystemMonitor(metrics)
# Запуск мониторинга
system_monitor.start_monitoring(interval=30.0) # Каждые 30 секунд
# Получение информации о системе
system_info = system_monitor.get_system_info()
print(f"Информация о системе: {system_info}")Мониторинг в реальном времени
Веб-интерфейс мониторинга
# Веб-интерфейс для мониторинга
from flask import Flask, jsonify, render_template_string
import json
from datetime import datetime
app = Flask(__name__)
# HTML шаблон для дашборда
DASHBOARD_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
<title>Мониторинг бота</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.metric-card {
border: 1px solid #ddd;
padding: 15px;
margin: 10px;
border-radius: 5px;
display: inline-block;
width: 200px;
}
.metric-value { font-size: 24px; font-weight: bold; color: #333; }
.metric-label { color: #666; }
.chart-container { width: 400px; height: 200px; margin: 20px; }
</style>
</head>
<body>
<h1>Мониторинг бота</h1>
<div id="metrics">
<div class="metric-card">
<div class="metric-value" id="requests-total">0</div>
<div class="metric-label">Всего запросов</div>
</div>
<div class="metric-card">
<div class="metric-value" id="error-rate">0%</div>
<div class="metric-label">Процент ошибок</div>
</div>
<div class="metric-card">
<div class="metric-value" id="avg-response">0ms</div>
<div class="metric-label">Среднее время ответа</div>
</div>
<div class="metric-card">
<div class="metric-value" id="cpu-usage">0%</div>
<div class="metric-label">Использование CPU</div>
</div>
</div>
<div class="chart-container">
<canvas id="responseTimeChart"></canvas>
</div>
<div class="chart-container">
<canvas id="errorRateChart"></canvas>
</div>
<script>
// Обновление метрик каждые 5 секунд
setInterval(updateMetrics, 5000);
function updateMetrics() {
fetch('/api/metrics')
.then(response => response.json())
.then(data => {
document.getElementById('requests-total').textContent = data.counters['requests_total'] || 0;
document.getElementById('error-rate').textContent =
((data.counters['database_query.error_count'] || 0) /
(data.counters['database_query.success_count'] || 1) * 100).toFixed(1) + '%';
document.getElementById('avg-response').textContent =
(data.timers['database_query.success']?.avg * 1000 || 0).toFixed(0) + 'ms';
document.getElementById('cpu-usage').textContent =
(data.gauges['system.cpu_percent'] || 0).toFixed(1) + '%';
});
}
// Инициализация графиков
const responseTimeCtx = document.getElementById('responseTimeChart').getContext('2d');
const responseTimeChart = new Chart(responseTimeCtx, {
type: 'line',
data: {
labels: [],
datasets: [{
label: 'Время ответа (мс)',
data: [],
borderColor: 'rgb(75, 192, 192)',
tension: 0.1
}]
},
options: {
responsive: true,
scales: {
y: {
beginAtZero: true
}
}
}
});
// Обновление графиков
setInterval(() => {
fetch('/api/metrics')
.then(response => response.json())
.then(data => {
const now = new Date().toLocaleTimeString();
const avgTime = data.timers['database_query.success']?.avg * 1000 || 0;
responseTimeChart.data.labels.push(now);
responseTimeChart.data.datasets[0].data.push(avgTime);
if (responseTimeChart.data.labels.length > 20) {
responseTimeChart.data.labels.shift();
responseTimeChart.data.datasets[0].data.shift();
}
responseTimeChart.update();
});
}, 5000);
// Первоначальная загрузка
updateMetrics();
</script>
</body>
</html>
"""
@app.route('/')
def dashboard():
"""Дашборд мониторинга"""
return render_template_string(DASHBOARD_TEMPLATE)
@app.route('/api/metrics')
def get_metrics():
"""API для получения метрик"""
return jsonify(metrics.get_all_metrics())
@app.route('/api/system')
def get_system_info():
"""API для получения информации о системе"""
return jsonify(system_monitor.get_system_info())
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)Интеграция с внешними системами
Интеграция с Prometheus
# Интеграция с Prometheus
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
# Создание метрик Prometheus
REQUEST_COUNT = Counter('bot_requests_total', 'Total requests', ['method', 'endpoint'])
REQUEST_DURATION = Histogram('bot_request_duration_seconds', 'Request duration')
ACTIVE_CONNECTIONS = Gauge('bot_active_connections', 'Active connections')
ERROR_COUNT = Counter('bot_errors_total', 'Total errors', ['error_type'])
class PrometheusMetrics:
"""Метрики Prometheus"""
def __init__(self, port: int = 8000):
self.port = port
start_http_server(port)
def record_request(self, method: str, endpoint: str, duration: float):
"""Запись метрики запроса"""
REQUEST_COUNT.labels(method=method, endpoint=endpoint).inc()
REQUEST_DURATION.observe(duration)
def set_active_connections(self, count: int):
"""Установка количества активных соединений"""
ACTIVE_CONNECTIONS.set(count)
def record_error(self, error_type: str):
"""Запись ошибки"""
ERROR_COUNT.labels(error_type=error_type).inc()
# Пример использования
prometheus_metrics = PrometheusMetrics(port=8000)
# Запись метрики
prometheus_metrics.record_request('GET', '/api/users', 0.123)
prometheus_metrics.set_active_connections(42)
prometheus_metrics.record_error('database_error')Интеграция с Grafana
# Интеграция с Grafana через InfluxDB
from influxdb import InfluxDBClient
import json
from datetime import datetime
class InfluxDBMetrics:
"""Метрики для InfluxDB"""
def __init__(self, host: str = 'localhost', port: int = 8086,
database: str = 'bot_metrics'):
self.client = InfluxDBClient(host=host, port=port)
self.database = database
self._ensure_database()
def _ensure_database(self):
"""Создание базы данных если не существует"""
databases = self.client.get_list_database()
if not any(db['name'] == self.database for db in databases):
self.client.create_database(self.database)
self.client.switch_database(self.database)
def write_metric(self, measurement: str, fields: Dict[str, Any],
tags: Dict[str, str] = None):
"""Запись метрики"""
point = {
'measurement': measurement,
'time': datetime.utcnow(),
'fields': fields,
'tags': tags or {}
}
self.client.write_points([point])
def write_request_metric(self, method: str, endpoint: str,
duration: float, status_code: int):
"""Запись метрики запроса"""
self.write_metric(
'requests',
{
'duration': duration,
'status_code': status_code
},
{
'method': method,
'endpoint': endpoint
}
)
def write_system_metric(self, cpu_percent: float, memory_percent: float):
"""Запись системной метрики"""
self.write_metric(
'system',
{
'cpu_percent': cpu_percent,
'memory_percent': memory_percent
}
)
# Пример использования
influx_metrics = InfluxDBMetrics()
# Запись метрик
influx_metrics.write_request_metric('GET', '/api/users', 0.123, 200)
influx_metrics.write_system_metric(45.2, 67.8)Алерты и уведомления
Система алертов
# Система алертов
import smtplib
import requests
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum
class AlertSeverity(Enum):
"""Уровни серьезности алертов"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class Alert:
"""Алерт"""
id: str
title: str
message: str
severity: AlertSeverity
timestamp: datetime
metadata: Dict[str, Any]
class AlertManager:
"""Менеджер алертов"""
def __init__(self):
self.alerts = []
self.notification_channels = []
def add_notification_channel(self, channel):
"""Добавление канала уведомлений"""
self.notification_channels.append(channel)
def create_alert(self, title: str, message: str,
severity: AlertSeverity, metadata: Dict[str, Any] = None):
"""Создание алерта"""
alert = Alert(
id=f"alert_{int(time.time())}",
title=title,
message=message,
severity=severity,
timestamp=datetime.now(),
metadata=metadata or {}
)
self.alerts.append(alert)
self._send_notifications(alert)
return alert
def _send_notifications(self, alert: Alert):
"""Отправка уведомлений"""
for channel in self.notification_channels:
try:
channel.send(alert)
except Exception as e:
print(f"Ошибка отправки уведомления: {e}")
class EmailNotificationChannel:
"""Канал уведомлений по email"""
def __init__(self, smtp_server: str, smtp_port: int,
username: str, password: str, recipients: List[str]):
self.smtp_server = smtp_server
self.smtp_port = smtp_port
self.username = username
self.password = password
self.recipients = recipients
def send(self, alert: Alert):
"""Отправка email уведомления"""
msg = MIMEMultipart()
msg['From'] = self.username
msg['To'] = ', '.join(self.recipients)
msg['Subject'] = f"[{alert.severity.value.upper()}] {alert.title}"
body = f"""
Алерт: {alert.title}
Сообщение: {alert.message}
Серьезность: {alert.severity.value}
Время: {alert.timestamp}
Метаданные: {json.dumps(alert.metadata, indent=2)}
"""
msg.attach(MIMEText(body, 'plain'))
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
class SlackNotificationChannel:
"""Канал уведомлений в Slack"""
def __init__(self, webhook_url: str, channel: str = "#alerts"):
self.webhook_url = webhook_url
self.channel = channel
def send(self, alert: Alert):
"""Отправка уведомления в Slack"""
color_map = {
AlertSeverity.LOW: "good",
AlertSeverity.MEDIUM: "warning",
AlertSeverity.HIGH: "danger",
AlertSeverity.CRITICAL: "danger"
}
payload = {
"channel": self.channel,
"attachments": [{
"color": color_map[alert.severity],
"title": alert.title,
"text": alert.message,
"fields": [
{"title": "Серьезность", "value": alert.severity.value, "short": True},
{"title": "Время", "value": alert.timestamp.isoformat(), "short": True}
],
"footer": "Bot Monitoring System"
}]
}
requests.post(self.webhook_url, json=payload)
# Пример использования
alert_manager = AlertManager()
# Добавление каналов уведомлений
email_channel = EmailNotificationChannel(
smtp_server="smtp.gmail.com",
smtp_port=587,
username="bot@example.com",
password="password",
recipients=["admin@example.com"]
)
slack_channel = SlackNotificationChannel(
webhook_url="https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
)
alert_manager.add_notification_channel(email_channel)
alert_manager.add_notification_channel(slack_channel)
# Создание алерта
alert_manager.create_alert(
title="Высокая частота ошибок",
message="Процент ошибок превысил 10%",
severity=AlertSeverity.HIGH,
metadata={"error_rate": 12.5, "time_window": "5min"}
)Лучшие практики
Конфигурация логирования
# Лучшие практики логирования
import logging.config
import yaml
# Конфигурация через YAML
LOGGING_CONFIG = """
version: 1
disable_existing_loggers: false
formatters:
standard:
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
json:
format: '{"timestamp": "%(asctime)s", "logger": "%(name)s", "level": "%(levelname)s", "message": "%(message)s"}'
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: standard
stream: ext://sys.stdout
file:
class: logging.handlers.RotatingFileHandler
level: DEBUG
formatter: json
filename: logs/bot.log
maxBytes: 10485760 # 10MB
backupCount: 5
error_file:
class: logging.handlers.RotatingFileHandler
level: ERROR
formatter: json
filename: logs/errors.log
maxBytes: 5242880 # 5MB
backupCount: 3
loggers:
bot:
level: DEBUG
handlers: [console, file, error_file]
propagate: false
requests:
level: WARNING
handlers: [console]
propagate: false
root:
level: INFO
handlers: [console]
"""
# Применение конфигурации
logging.config.dictConfig(yaml.safe_load(LOGGING_CONFIG))
# Создание логгера
logger = logging.getLogger('bot')
# Примеры использования
logger.debug("Отладочная информация")
logger.info("Информационное сообщение")
logger.warning("Предупреждение")
logger.error("Ошибка")
logger.critical("Критическая ошибка")Мониторинг производительности
# Мониторинг производительности
import cProfile
import pstats
import io
from functools import wraps
import time
def profile_performance(func):
"""Декоратор для профилирования производительности"""
@wraps(func)
def wrapper(args, *kwargs):
profiler = cProfile.Profile()
profiler.enable()
start_time = time.time()
result = func(args, *kwargs)
end_time = time.time()
profiler.disable()
# Сохранение результатов профилирования
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats(10) # Топ 10 функций
logger.info(f"Профилирование {func.__name__}: {end_time - start_time:.3f}с")
logger.debug(f"Детали профилирования:\n{s.getvalue()}")
return result
return wrapper
# Пример использования
@profile_performance
def expensive_operation():
"""Дорогая операция"""
time.sleep(0.1)
return "Результат"
# Выполнение
result = expensive_operation()Заключение
В этой статье мы рассмотрели комплексный подход к логированию и мониторингу Python приложений:- ✅ Настройка и конфигурация логирования
- ✅ Структурированное и контекстное логирование
- ✅ Система метрик и мониторинг производительности
- ✅ Веб-интерфейс для мониторинга
- ✅ Интеграция с внешними системами (Prometheus, Grafana)
- ✅ Система алертов и уведомлений
- ✅ Лучшие практики
Полезные ссылки
510 просмотров
33 лайков
0 комментариев
Комментарии (0)
Пока нет комментариев. Будьте первым!