Интеграция Telegram бота с базой данных

В этой статье мы рассмотрим, как интегрировать Telegram бота с различными базами данных для хранения пользовательских данных, статистики и настроек.

Содержание

Выбор базы данных

SQLite - для простых проектов

# Преимущества SQLite:
# ✅ Простота настройки
# ✅ Не требует отдельного сервера
# ✅ Идеально для MVP
# ✅ Встроенная поддержка в Python

# Недостатки:
# ❌ Ограниченная производительность
# ❌ Нет сетевого доступа
# ❌ Ограниченная поддержка конкурентности

PostgreSQL - для серьезных проектов

# Преимущества PostgreSQL:
# ✅ Высокая производительность
# ✅ Поддержка JSON
# ✅ Расширяемость
# ✅ Надежность

# Недостатки:
# ❌ Сложность настройки
# ❌ Требует отдельный сервер
# ❌ Больше ресурсов

Настройка SQLite

Установка зависимостей

pip install sqlalchemy sqlite3

Создание модели базы данных

# models/database.py
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, Text, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import os

# Создание движка базы данных
DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///bot.db')
engine = create_engine(DATABASE_URL, echo=True)

# Создание сессии
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Базовый класс для моделей
Base = declarative_base()

class User(Base):
    """Модель пользователя"""
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    telegram_id = Column(Integer, unique=True, index=True, nullable=False)
    username = Column(String(255), nullable=True)
    first_name = Column(String(255), nullable=False)
    last_name = Column(String(255), nullable=True)
    language_code = Column(String(10), default='ru')
    is_bot = Column(Boolean, default=False)
    is_premium = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    last_activity = Column(DateTime, default=datetime.utcnow)
    
    # Дополнительные поля
    subscription_type = Column(String(50), default='free')
    subscription_expires = Column(DateTime, nullable=True)
    settings = Column(Text, default='{}')  # JSON настройки
    balance = Column(Float, default=0.0)
    
class Chat(Base):
    """Модель чата"""
    __tablename__ = "chats"
    
    id = Column(Integer, primary_key=True, index=True)
    telegram_id = Column(Integer, unique=True, index=True, nullable=False)
    chat_type = Column(String(50), nullable=False)  # private, group, supergroup, channel
    title = Column(String(255), nullable=True)
    username = Column(String(255), nullable=True)
    description = Column(Text, nullable=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
class Message(Base):
    """Модель сообщения"""
    __tablename__ = "messages"
    
    id = Column(Integer, primary_key=True, index=True)
    message_id = Column(Integer, nullable=False)
    user_id = Column(Integer, nullable=False)
    chat_id = Column(Integer, nullable=False)
    text = Column(Text, nullable=True)
    message_type = Column(String(50), default='text')  # text, photo, document, etc.
    file_id = Column(String(255), nullable=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    
class BotStats(Base):
    """Статистика бота"""
    __tablename__ = "bot_stats"
    
    id = Column(Integer, primary_key=True, index=True)
    date = Column(DateTime, default=datetime.utcnow)
    total_users = Column(Integer, default=0)
    active_users = Column(Integer, default=0)
    messages_sent = Column(Integer, default=0)
    commands_used = Column(Integer, default=0)
    
# Создание всех таблиц
def create_tables():
    Base.metadata.create_all(bind=engine)

# Получение сессии базы данных
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Сервисы для работы с данными

# services/user_service.py
from sqlalchemy.orm import Session
from models.database import User, Chat, Message
from datetime import datetime
import json

class UserService:
    def __init__(self, db: Session):
        self.db = db
    
    def create_user(self, telegram_user):
        """Создание нового пользователя"""
        user = User(
            telegram_id=telegram_user.id,
            username=telegram_user.username,
            first_name=telegram_user.first_name,
            last_name=telegram_user.last_name,
            language_code=telegram_user.language_code,
            is_bot=telegram_user.is_bot
        )
        self.db.add(user)
        self.db.commit()
        self.db.refresh(user)
        return user
    
    def get_user(self, telegram_id: int):
        """Получение пользователя по Telegram ID"""
        return self.db.query(User).filter(User.telegram_id == telegram_id).first()
    
    def update_user_activity(self, telegram_id: int):
        """Обновление времени последней активности"""
        user = self.get_user(telegram_id)
        if user:
            user.last_activity = datetime.utcnow()
            self.db.commit()
    
    def update_user_settings(self, telegram_id: int, settings: dict):
        """Обновление настроек пользователя"""
        user = self.get_user(telegram_id)
        if user:
            user.settings = json.dumps(settings)
            user.updated_at = datetime.utcnow()
            self.db.commit()
    
    def get_user_settings(self, telegram_id: int):
        """Получение настроек пользователя"""
        user = self.get_user(telegram_id)
        if user and user.settings:
            return json.loads(user.settings)
        return {}
    
    def get_all_users(self, limit: int = 100, offset: int = 0):
        """Получение списка всех пользователей"""
        return self.db.query(User).offset(offset).limit(limit).all()
    
    def get_active_users(self, days: int = 7):
        """Получение активных пользователей за последние N дней"""
        from datetime import timedelta
        cutoff_date = datetime.utcnow() - timedelta(days=days)
        return self.db.query(User).filter(User.last_activity >= cutoff_date).all()
    
    def update_balance(self, telegram_id: int, amount: float):
        """Обновление баланса пользователя"""
        user = self.get_user(telegram_id)
        if user:
            user.balance += amount
            self.db.commit()
            return user.balance
        return None

class ChatService:
    def __init__(self, db: Session):
        self.db = db
    
    def create_chat(self, telegram_chat):
        """Создание нового чата"""
        chat = Chat(
            telegram_id=telegram_chat.id,
            chat_type=telegram_chat.type,
            title=telegram_chat.title,
            username=telegram_chat.username,
            description=telegram_chat.description
        )
        self.db.add(chat)
        self.db.commit()
        self.db.refresh(chat)
        return chat
    
    def get_chat(self, telegram_id: int):
        """Получение чата по Telegram ID"""
        return self.db.query(Chat).filter(Chat.telegram_id == telegram_id).first()

class MessageService:
    def __init__(self, db: Session):
        self.db = db
    
    def save_message(self, message_data):
        """Сохранение сообщения"""
        message = Message(
            message_id=message_data['message_id'],
            user_id=message_data['user_id'],
            chat_id=message_data['chat_id'],
            text=message_data.get('text'),
            message_type=message_data.get('type', 'text'),
            file_id=message_data.get('file_id')
        )
        self.db.add(message)
        self.db.commit()
        self.db.refresh(message)
        return message
    
    def get_user_messages(self, user_id: int, limit: int = 50):
        """Получение сообщений пользователя"""
        return self.db.query(Message).filter(
            Message.user_id == user_id
        ).order_by(Message.created_at.desc()).limit(limit).all()

Интеграция PostgreSQL

Настройка PostgreSQL

# config/database.py
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# PostgreSQL конфигурация
POSTGRES_USER = os.getenv('POSTGRES_USER', 'bot_user')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD', 'bot_password')
POSTGRES_HOST = os.getenv('POSTGRES_HOST', 'localhost')
POSTGRES_PORT = os.getenv('POSTGRES_PORT', '5432')
POSTGRES_DB = os.getenv('POSTGRES_DB', 'telegram_bot')

# URL подключения к PostgreSQL
DATABASE_URL = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"

# Создание движка
engine = create_engine(DATABASE_URL, echo=True)

# Создание сессии
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Docker Compose для PostgreSQL

# docker-compose.yml
version: '3.8'

services:
  postgres:
    image: postgres:15
    container_name: telegram-bot-postgres
    environment:
      POSTGRES_DB: telegram_bot
      POSTGRES_USER: bot_user
      POSTGRES_PASSWORD: bot_password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"
    networks:
      - bot-network

  telegram-bot:
    build: .
    container_name: telegram-bot
    environment:
      - BOT_TOKEN=${BOT_TOKEN}
      - DATABASE_URL=postgresql://bot_user:bot_password@postgres:5432/telegram_bot
    depends_on:
      - postgres
    networks:
      - bot-network

volumes:
  postgres_data:

networks:
  bot-network:
    driver: bridge

CRUD операции

Интеграция с ботом

# bot_with_database.py
import logging
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from sqlalchemy.orm import Session
from models.database import get_db, create_tables
from services.user_service import UserService
from services.chat_service import ChatService
from services.message_service import MessageService
import os
from dotenv import load_dotenv

load_dotenv()

logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    level=logging.INFO
)
logger = logging.getLogger(__name__)

BOT_TOKEN = os.getenv('BOT_TOKEN')

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """Обработчик команды /start с сохранением в БД"""
    user = update.effective_user
    chat = update.effective_chat
    
    # Получение сессии базы данных
    db = next(get_db())
    
    try:
        # Создание сервисов
        user_service = UserService(db)
        chat_service = ChatService(db)
        
        # Проверка существования пользователя
        existing_user = user_service.get_user(user.id)
        if not existing_user:
            # Создание нового пользователя
            new_user = user_service.create_user(user)
            logger.info(f"Создан новый пользователь: {new_user.first_name}")
        else:
            # Обновление активности существующего пользователя
            user_service.update_user_activity(user.id)
        
        # Обработка чата
        existing_chat = chat_service.get_chat(chat.id)
        if not existing_chat:
            chat_service.create_chat(chat)
        
        await update.message.reply_text(
            f'Привет, {user.first_name}! 👋\n\n'
            'Я бот с интеграцией базы данных!\n'
            'Используй /profile для просмотра профиля.'
        )
        
    except Exception as e:
        logger.error(f"Ошибка при работе с БД: {e}")
        await update.message.reply_text("Произошла ошибка. Попробуйте позже.")
    finally:
        db.close()

async def profile(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """Просмотр профиля пользователя"""
    user = update.effective_user
    
    db = next(get_db())
    try:
        user_service = UserService(db)
        db_user = user_service.get_user(user.id)
        
        if db_user:
            profile_text = f"""
👤 Профиль пользователя

🆔 Telegram ID: {db_user.telegram_id}
👤 Имя: {db_user.first_name}
📝 Username: @{db_user.username or 'не указан'}
🌐 Язык: {db_user.language_code}
💎 Премиум: {'Да' if db_user.is_premium else 'Нет'}
💰 Баланс: {db_user.balance} руб.
📅 Регистрация: {db_user.created_at.strftime('%d.%m.%Y %H:%M')}
🕐 Последняя активность: {db_user.last_activity.strftime('%d.%m.%Y %H:%M')}
            """
            await update.message.reply_text(profile_text, parse_mode='Markdown')
        else:
            await update.message.reply_text("Пользователь не найден в базе данных.")
            
    except Exception as e:
        logger.error(f"Ошибка при получении профиля: {e}")
        await update.message.reply_text("Произошла ошибка при получении профиля.")
    finally:
        db.close()

async def stats(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """Статистика бота (только для админов)"""
    user = update.effective_user
    
    # Проверка прав администратора
    admin_ids = [int(x) for x in os.getenv('ADMIN_IDS', '').split(',') if x]
    if user.id not in admin_ids:
        await update.message.reply_text("❌ У вас нет прав для просмотра статистики.")
        return
    
    db = next(get_db())
    try:
        user_service = UserService(db)
        
        # Получение статистики
        total_users = len(user_service.get_all_users(limit=10000))
        active_users = len(user_service.get_active_users(days=7))
        
        stats_text = f"""
📊 Статистика бота

👥 Всего пользователей: {total_users}
🟢 Активных за неделю: {active_users}
📈 Активность: {(active_users/total_users*100):.1f}% если total_users > 0 else 0}%
        """
        
        await update.message.reply_text(stats_text, parse_mode='Markdown')
        
    except Exception as e:
        logger.error(f"Ошибка при получении статистики: {e}")
        await update.message.reply_text("Произошла ошибка при получении статистики.")
    finally:
        db.close()

async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """Обработка всех сообщений с сохранением в БД"""
    user = update.effective_user
    chat = update.effective_chat
    message = update.message
    
    db = next(get_db())
    try:
        # Обновление активности пользователя
        user_service = UserService(db)
        user_service.update_user_activity(user.id)
        
        # Сохранение сообщения
        message_service = MessageService(db)
        message_data = {
            'message_id': message.message_id,
            'user_id': user.id,
            'chat_id': chat.id,
            'text': message.text,
            'type': 'text'
        }
        message_service.save_message(message_data)
        
        # Простой ответ
        await message.reply_text(f"Получено: {message.text}")
        
    except Exception as e:
        logger.error(f"Ошибка при обработке сообщения: {e}")
    finally:
        db.close()

def main():
    """Основная функция запуска бота"""
    if not BOT_TOKEN:
        logger.error("BOT_TOKEN не найден!")
        return
    
    # Создание таблиц базы данных
    create_tables()
    
    # Создание приложения
    application = Application.builder().token(BOT_TOKEN).build()
    
    # Добавление обработчиков
    application.add_handler(CommandHandler("start", start))
    application.add_handler(CommandHandler("profile", profile))
    application.add_handler(CommandHandler("stats", stats))
    application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
    
    # Запуск бота
    logger.info("Бот с базой данных запущен!")
    application.run_polling()

if __name__ == '__main__':
    main()

Кэширование

Redis для кэширования

# services/cache_service.py
import redis
import json
from datetime import timedelta
import os

class CacheService:
    def __init__(self):
        self.redis_client = redis.Redis(
            host=os.getenv('REDIS_HOST', 'localhost'),
            port=int(os.getenv('REDIS_PORT', 6379)),
            db=int(os.getenv('REDIS_DB', 0)),
            decode_responses=True
        )
    
    def get_user_cache(self, user_id: int):
        """Получение кэша пользователя"""
        key = f"user:{user_id}"
        data = self.redis_client.get(key)
        return json.loads(data) if data else None
    
    def set_user_cache(self, user_id: int, data: dict, ttl: int = 3600):
        """Сохранение кэша пользователя"""
        key = f"user:{user_id}"
        self.redis_client.setex(key, ttl, json.dumps(data))
    
    def get_stats_cache(self):
        """Получение кэша статистики"""
        data = self.redis_client.get("bot_stats")
        return json.loads(data) if data else None
    
    def set_stats_cache(self, stats: dict, ttl: int = 300):
        """Сохранение кэша статистики"""
        self.redis_client.setex("bot_stats", ttl, json.dumps(stats))
    
    def clear_user_cache(self, user_id: int):
        """Очистка кэша пользователя"""
        key = f"user:{user_id}"
        self.redis_client.delete(key)

Миграции

Alembic для миграций

# migrations/env.py
from alembic import context
from sqlalchemy import engine_from_config, pool
from models.database import Base

# Конфигурация Alembic
config = context.config

# Импорт моделей для автогенерации
target_metadata = Base.metadata

def run_migrations_offline():
    """Запуск миграций в офлайн режиме"""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online():
    """Запуск миграций в онлайн режиме"""
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection, target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Заключение

В этой статье мы рассмотрели:
  • ✅ Выбор подходящей базы данных
  • ✅ Настройку SQLite и PostgreSQL
  • ✅ Создание моделей данных
  • ✅ CRUD операции через сервисы
  • ✅ Интеграцию с Telegram ботом
  • ✅ Кэширование с Redis
  • ✅ Миграции с Alembic
Интеграция с базой данных делает вашего бота более функциональным и позволяет хранить пользовательские данные, статистику и настройки.

Полезные ссылки

1475 просмотров
27 лайков
0 комментариев