Интеграция Telegram бота с базой данных
В этой статье мы рассмотрим, как интегрировать Telegram бота с различными базами данных для хранения пользовательских данных, статистики и настроек.Содержание
- Выбор базы данных
- Настройка SQLite
- Интеграция PostgreSQL
- Модели данных
- CRUD операции
- Кэширование
- Миграции
Выбор базы данных
SQLite - для простых проектов
# Преимущества SQLite:
# ✅ Простота настройки
# ✅ Не требует отдельного сервера
# ✅ Идеально для MVP
# ✅ Встроенная поддержка в Python
# Недостатки:
# ❌ Ограниченная производительность
# ❌ Нет сетевого доступа
# ❌ Ограниченная поддержка конкурентностиPostgreSQL - для серьезных проектов
# Преимущества PostgreSQL:
# ✅ Высокая производительность
# ✅ Поддержка JSON
# ✅ Расширяемость
# ✅ Надежность
# Недостатки:
# ❌ Сложность настройки
# ❌ Требует отдельный сервер
# ❌ Больше ресурсовНастройка SQLite
Установка зависимостей
pip install sqlalchemy sqlite3Создание модели базы данных
# models/database.py
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, Text, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import os
# Создание движка базы данных
DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///bot.db')
engine = create_engine(DATABASE_URL, echo=True)
# Создание сессии
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Базовый класс для моделей
Base = declarative_base()
class User(Base):
"""Модель пользователя"""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
telegram_id = Column(Integer, unique=True, index=True, nullable=False)
username = Column(String(255), nullable=True)
first_name = Column(String(255), nullable=False)
last_name = Column(String(255), nullable=True)
language_code = Column(String(10), default='ru')
is_bot = Column(Boolean, default=False)
is_premium = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
last_activity = Column(DateTime, default=datetime.utcnow)
# Дополнительные поля
subscription_type = Column(String(50), default='free')
subscription_expires = Column(DateTime, nullable=True)
settings = Column(Text, default='{}') # JSON настройки
balance = Column(Float, default=0.0)
class Chat(Base):
"""Модель чата"""
__tablename__ = "chats"
id = Column(Integer, primary_key=True, index=True)
telegram_id = Column(Integer, unique=True, index=True, nullable=False)
chat_type = Column(String(50), nullable=False) # private, group, supergroup, channel
title = Column(String(255), nullable=True)
username = Column(String(255), nullable=True)
description = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Message(Base):
"""Модель сообщения"""
__tablename__ = "messages"
id = Column(Integer, primary_key=True, index=True)
message_id = Column(Integer, nullable=False)
user_id = Column(Integer, nullable=False)
chat_id = Column(Integer, nullable=False)
text = Column(Text, nullable=True)
message_type = Column(String(50), default='text') # text, photo, document, etc.
file_id = Column(String(255), nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
class BotStats(Base):
"""Статистика бота"""
__tablename__ = "bot_stats"
id = Column(Integer, primary_key=True, index=True)
date = Column(DateTime, default=datetime.utcnow)
total_users = Column(Integer, default=0)
active_users = Column(Integer, default=0)
messages_sent = Column(Integer, default=0)
commands_used = Column(Integer, default=0)
# Создание всех таблиц
def create_tables():
Base.metadata.create_all(bind=engine)
# Получение сессии базы данных
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()Сервисы для работы с данными
# services/user_service.py
from sqlalchemy.orm import Session
from models.database import User, Chat, Message
from datetime import datetime
import json
class UserService:
def __init__(self, db: Session):
self.db = db
def create_user(self, telegram_user):
"""Создание нового пользователя"""
user = User(
telegram_id=telegram_user.id,
username=telegram_user.username,
first_name=telegram_user.first_name,
last_name=telegram_user.last_name,
language_code=telegram_user.language_code,
is_bot=telegram_user.is_bot
)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
return user
def get_user(self, telegram_id: int):
"""Получение пользователя по Telegram ID"""
return self.db.query(User).filter(User.telegram_id == telegram_id).first()
def update_user_activity(self, telegram_id: int):
"""Обновление времени последней активности"""
user = self.get_user(telegram_id)
if user:
user.last_activity = datetime.utcnow()
self.db.commit()
def update_user_settings(self, telegram_id: int, settings: dict):
"""Обновление настроек пользователя"""
user = self.get_user(telegram_id)
if user:
user.settings = json.dumps(settings)
user.updated_at = datetime.utcnow()
self.db.commit()
def get_user_settings(self, telegram_id: int):
"""Получение настроек пользователя"""
user = self.get_user(telegram_id)
if user and user.settings:
return json.loads(user.settings)
return {}
def get_all_users(self, limit: int = 100, offset: int = 0):
"""Получение списка всех пользователей"""
return self.db.query(User).offset(offset).limit(limit).all()
def get_active_users(self, days: int = 7):
"""Получение активных пользователей за последние N дней"""
from datetime import timedelta
cutoff_date = datetime.utcnow() - timedelta(days=days)
return self.db.query(User).filter(User.last_activity >= cutoff_date).all()
def update_balance(self, telegram_id: int, amount: float):
"""Обновление баланса пользователя"""
user = self.get_user(telegram_id)
if user:
user.balance += amount
self.db.commit()
return user.balance
return None
class ChatService:
def __init__(self, db: Session):
self.db = db
def create_chat(self, telegram_chat):
"""Создание нового чата"""
chat = Chat(
telegram_id=telegram_chat.id,
chat_type=telegram_chat.type,
title=telegram_chat.title,
username=telegram_chat.username,
description=telegram_chat.description
)
self.db.add(chat)
self.db.commit()
self.db.refresh(chat)
return chat
def get_chat(self, telegram_id: int):
"""Получение чата по Telegram ID"""
return self.db.query(Chat).filter(Chat.telegram_id == telegram_id).first()
class MessageService:
def __init__(self, db: Session):
self.db = db
def save_message(self, message_data):
"""Сохранение сообщения"""
message = Message(
message_id=message_data['message_id'],
user_id=message_data['user_id'],
chat_id=message_data['chat_id'],
text=message_data.get('text'),
message_type=message_data.get('type', 'text'),
file_id=message_data.get('file_id')
)
self.db.add(message)
self.db.commit()
self.db.refresh(message)
return message
def get_user_messages(self, user_id: int, limit: int = 50):
"""Получение сообщений пользователя"""
return self.db.query(Message).filter(
Message.user_id == user_id
).order_by(Message.created_at.desc()).limit(limit).all()Интеграция PostgreSQL
Настройка PostgreSQL
# config/database.py
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# PostgreSQL конфигурация
POSTGRES_USER = os.getenv('POSTGRES_USER', 'bot_user')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD', 'bot_password')
POSTGRES_HOST = os.getenv('POSTGRES_HOST', 'localhost')
POSTGRES_PORT = os.getenv('POSTGRES_PORT', '5432')
POSTGRES_DB = os.getenv('POSTGRES_DB', 'telegram_bot')
# URL подключения к PostgreSQL
DATABASE_URL = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
# Создание движка
engine = create_engine(DATABASE_URL, echo=True)
# Создание сессии
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)Docker Compose для PostgreSQL
# docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:15
container_name: telegram-bot-postgres
environment:
POSTGRES_DB: telegram_bot
POSTGRES_USER: bot_user
POSTGRES_PASSWORD: bot_password
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
networks:
- bot-network
telegram-bot:
build: .
container_name: telegram-bot
environment:
- BOT_TOKEN=${BOT_TOKEN}
- DATABASE_URL=postgresql://bot_user:bot_password@postgres:5432/telegram_bot
depends_on:
- postgres
networks:
- bot-network
volumes:
postgres_data:
networks:
bot-network:
driver: bridgeCRUD операции
Интеграция с ботом
# bot_with_database.py
import logging
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from sqlalchemy.orm import Session
from models.database import get_db, create_tables
from services.user_service import UserService
from services.chat_service import ChatService
from services.message_service import MessageService
import os
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
BOT_TOKEN = os.getenv('BOT_TOKEN')
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик команды /start с сохранением в БД"""
user = update.effective_user
chat = update.effective_chat
# Получение сессии базы данных
db = next(get_db())
try:
# Создание сервисов
user_service = UserService(db)
chat_service = ChatService(db)
# Проверка существования пользователя
existing_user = user_service.get_user(user.id)
if not existing_user:
# Создание нового пользователя
new_user = user_service.create_user(user)
logger.info(f"Создан новый пользователь: {new_user.first_name}")
else:
# Обновление активности существующего пользователя
user_service.update_user_activity(user.id)
# Обработка чата
existing_chat = chat_service.get_chat(chat.id)
if not existing_chat:
chat_service.create_chat(chat)
await update.message.reply_text(
f'Привет, {user.first_name}! 👋\n\n'
'Я бот с интеграцией базы данных!\n'
'Используй /profile для просмотра профиля.'
)
except Exception as e:
logger.error(f"Ошибка при работе с БД: {e}")
await update.message.reply_text("Произошла ошибка. Попробуйте позже.")
finally:
db.close()
async def profile(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Просмотр профиля пользователя"""
user = update.effective_user
db = next(get_db())
try:
user_service = UserService(db)
db_user = user_service.get_user(user.id)
if db_user:
profile_text = f"""
👤 Профиль пользователя
🆔 Telegram ID: {db_user.telegram_id}
👤 Имя: {db_user.first_name}
📝 Username: @{db_user.username or 'не указан'}
🌐 Язык: {db_user.language_code}
💎 Премиум: {'Да' if db_user.is_premium else 'Нет'}
💰 Баланс: {db_user.balance} руб.
📅 Регистрация: {db_user.created_at.strftime('%d.%m.%Y %H:%M')}
🕐 Последняя активность: {db_user.last_activity.strftime('%d.%m.%Y %H:%M')}
"""
await update.message.reply_text(profile_text, parse_mode='Markdown')
else:
await update.message.reply_text("Пользователь не найден в базе данных.")
except Exception as e:
logger.error(f"Ошибка при получении профиля: {e}")
await update.message.reply_text("Произошла ошибка при получении профиля.")
finally:
db.close()
async def stats(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Статистика бота (только для админов)"""
user = update.effective_user
# Проверка прав администратора
admin_ids = [int(x) for x in os.getenv('ADMIN_IDS', '').split(',') if x]
if user.id not in admin_ids:
await update.message.reply_text("❌ У вас нет прав для просмотра статистики.")
return
db = next(get_db())
try:
user_service = UserService(db)
# Получение статистики
total_users = len(user_service.get_all_users(limit=10000))
active_users = len(user_service.get_active_users(days=7))
stats_text = f"""
📊 Статистика бота
👥 Всего пользователей: {total_users}
🟢 Активных за неделю: {active_users}
📈 Активность: {(active_users/total_users*100):.1f}% если total_users > 0 else 0}%
"""
await update.message.reply_text(stats_text, parse_mode='Markdown')
except Exception as e:
logger.error(f"Ошибка при получении статистики: {e}")
await update.message.reply_text("Произошла ошибка при получении статистики.")
finally:
db.close()
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка всех сообщений с сохранением в БД"""
user = update.effective_user
chat = update.effective_chat
message = update.message
db = next(get_db())
try:
# Обновление активности пользователя
user_service = UserService(db)
user_service.update_user_activity(user.id)
# Сохранение сообщения
message_service = MessageService(db)
message_data = {
'message_id': message.message_id,
'user_id': user.id,
'chat_id': chat.id,
'text': message.text,
'type': 'text'
}
message_service.save_message(message_data)
# Простой ответ
await message.reply_text(f"Получено: {message.text}")
except Exception as e:
logger.error(f"Ошибка при обработке сообщения: {e}")
finally:
db.close()
def main():
"""Основная функция запуска бота"""
if not BOT_TOKEN:
logger.error("BOT_TOKEN не найден!")
return
# Создание таблиц базы данных
create_tables()
# Создание приложения
application = Application.builder().token(BOT_TOKEN).build()
# Добавление обработчиков
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("profile", profile))
application.add_handler(CommandHandler("stats", stats))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
# Запуск бота
logger.info("Бот с базой данных запущен!")
application.run_polling()
if __name__ == '__main__':
main()Кэширование
Redis для кэширования
# services/cache_service.py
import redis
import json
from datetime import timedelta
import os
class CacheService:
def __init__(self):
self.redis_client = redis.Redis(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6379)),
db=int(os.getenv('REDIS_DB', 0)),
decode_responses=True
)
def get_user_cache(self, user_id: int):
"""Получение кэша пользователя"""
key = f"user:{user_id}"
data = self.redis_client.get(key)
return json.loads(data) if data else None
def set_user_cache(self, user_id: int, data: dict, ttl: int = 3600):
"""Сохранение кэша пользователя"""
key = f"user:{user_id}"
self.redis_client.setex(key, ttl, json.dumps(data))
def get_stats_cache(self):
"""Получение кэша статистики"""
data = self.redis_client.get("bot_stats")
return json.loads(data) if data else None
def set_stats_cache(self, stats: dict, ttl: int = 300):
"""Сохранение кэша статистики"""
self.redis_client.setex("bot_stats", ttl, json.dumps(stats))
def clear_user_cache(self, user_id: int):
"""Очистка кэша пользователя"""
key = f"user:{user_id}"
self.redis_client.delete(key)Миграции
Alembic для миграций
# migrations/env.py
from alembic import context
from sqlalchemy import engine_from_config, pool
from models.database import Base
# Конфигурация Alembic
config = context.config
# Импорт моделей для автогенерации
target_metadata = Base.metadata
def run_migrations_offline():
"""Запуск миграций в офлайн режиме"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Запуск миграций в онлайн режиме"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()Заключение
В этой статье мы рассмотрели:- ✅ Выбор подходящей базы данных
- ✅ Настройку SQLite и PostgreSQL
- ✅ Создание моделей данных
- ✅ CRUD операции через сервисы
- ✅ Интеграцию с Telegram ботом
- ✅ Кэширование с Redis
- ✅ Миграции с Alembic
Полезные ссылки
1475 просмотров
27 лайков
0 комментариев
Комментарии (0)
Пока нет комментариев. Будьте первым!