Автоматическое создание AI-дайджестов из Telegram-каналов

В современном мире информации мы сталкиваемся с огромным потоком новостей из различных Telegram-каналов. Каждый день приходят сотни сообщений, и следить за всеми становится практически невозможно. В этой статье мы создадим систему автоматического создания интеллектуальных дайджестов новостей с группировкой по темам и автоматической публикацией в канале.

Зачем это нужно?

Проблема информационного шума

    • Слишком много источников — подписка на 20-30 каналов приводит к тысячам сообщений в день
    • Дублирование информации — одна новость пересказывается в разных каналах
    • Нет структурированности — все новости смешаны без группировки по темам
    • Трудно найти важное — ключевая информация теряется в потоке

Решение: AI-дайджесты

Мы создадим систему, которая:
    • Собирает новости из множества Telegram-каналов
    • Группирует по темам используя AI-классификацию
    • Убирает дубликаты с помощью векторного поиска
    • Создает краткие резюме каждой темы с помощью LLM
    • Публикует дайджест в целевой канал по расписанию

Что мы создадим?

Задача системы:
    • Мониторинг новостей из нескольких Telegram-каналов в реальном времени
    • Автоматическая группировка новостей по темам с помощью AI
    • Создание кратких резюме для каждой темы
    • Удаление дубликатов и похожих сообщений
    • Автоматическая публикация дайджестов в целевой канал по расписанию (например, каждый час или раз в день)
Что мы получим: Полнофункциональную систему автоматического создания дайджестов, которая превратит хаотичный поток новостей в структурированные, легко читаемые сводки.

Архитектура решения

Наша система состоит из следующих компонентов:
    • Telegram-монитор — собирает сообщения из каналов
    • Векторная база данных — для поиска дубликатов и похожих сообщений
    • AI-классификатор — группирует новости по темам
    • LLM-суммаризатор — создает краткие резюме
    • Планировщик публикаций — публикует дайджесты по расписанию

Шаг 1: Установка зависимостей

pip install telethon qdrant-client openai httpx python-dotenv schedule aiofiles

Шаг 2: Настройка переменных окружения

Создайте файл .env:
# Telegram API (получить на https://my.telegram.org/)
API_ID=12345678
API_HASH=your_api_hash_here

OpenAI для эмбеддингов и суммирования

OPENAI_API_KEY=your_openai_key

Qdrant для векторного поиска

QDRANT_URL=https://your-qdrant-instance.com QDRANT_API_KEY=your_qdrant_key

Список каналов для мониторинга (через запятую)

SOURCE_CHANNELS=@channel1,@channel2,@channel3

Целевой канал для публикации дайджестов

DEST_CHANNEL=@your_digest_channel

Настройки

COLLECTION_NAME=telegram_digest SIMILARITY_THRESHOLD=0.85 EMBEDDING_MODEL=text-embedding-3-small LLM_MODEL=gpt-4o-mini

Расписание публикации (cron format или простое время)

PUBLISH_SCHEDULE=hourly # hourly, daily, weekly

Шаг 3: Создание основной системы

Создайте файл telegram_digest_bot.py:
import asyncio
import os
import json
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from collections import defaultdict
import schedule
import time

from telethon import TelegramClient, events
from telethon.tl.types import Message
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from openai import OpenAI
import httpx
from dotenv import load_dotenv

Загружаем переменные окружения

load_dotenv()

Настройка логирования

logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__)

Переменные окружения

API_ID = int(os.getenv("API_ID", "0")) API_HASH = os.getenv("API_HASH") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") QDRANT_URL = os.getenv("QDRANT_URL") QDRANT_API_KEY = os.getenv("QDRANT_API_KEY") SOURCE_CHANNELS = [ch.strip() for ch in os.getenv("SOURCE_CHANNELS", "").split(",") if ch.strip()] DEST_CHANNEL = os.getenv("DEST_CHANNEL") COLLECTION_NAME = os.getenv("COLLECTION_NAME", "telegram_digest") SIMILARITY_THRESHOLD = float(os.getenv("SIMILARITY_THRESHOLD", "0.85")) EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small") LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini") PUBLISH_SCHEDULE = os.getenv("PUBLISH_SCHEDULE", "hourly")

Глобальные клиенты

telegram_client = None qdrant_client = None openai_client = None

Хранилище новостей для текущего периода

news_buffer: Dict[str, List[Dict]] = defaultdict(list) news_processed_ids = set() async def init_clients(): """Инициализация всех клиентов""" global telegram_client, qdrant_client, openai_client # Telegram клиент telegram_client = TelegramClient('digest_bot_session', API_ID, API_HASH) await telegram_client.start() logger.info("Telegram client initialized") # Qdrant клиент qdrant_client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY) # Создаем коллекцию, если её нет try: collections = qdrant_client.get_collections().collections collection_names = [c.name for c in collections] if COLLECTION_NAME not in collection_names: qdrant_client.create_collection( collection_name=COLLECTION_NAME, vectors_config=VectorParams( size=1536, # Размерность для text-embedding-3-small distance=Distance.COSINE ) ) logger.info(f"Created collection {COLLECTION_NAME}") else: logger.info(f"Collection {COLLECTION_NAME} already exists") except Exception as e: logger.error(f"Error working with Qdrant: {e}") raise # OpenAI клиент if OPENAI_API_KEY: openai_client = OpenAI(api_key=OPENAI_API_KEY) logger.info("OpenAI client initialized") else: logger.warning("OPENAI_API_KEY not set") def get_text_embedding(text: str) -> Optional[List[float]]: """Получение векторного представления текста""" if not openai_client: return None try: response = openai_client.embeddings.create( model=EMBEDDING_MODEL, input=text ) return response.data[0].embedding except Exception as e: logger.error(f"Error getting embedding: {e}") return None def is_duplicate(text: str, threshold: float = SIMILARITY_THRESHOLD) -> bool: """Проверка, является ли сообщение дубликатом""" embedding = get_text_embedding(text) if not embedding: return False try: # Поиск похожих векторов search_results = qdrant_client.search( collection_name=COLLECTION_NAME, query_vector=embedding, limit=1, score_threshold=threshold ) if search_results and search_results[0].score >= threshold: logger.info(f"Found duplicate with score {search_results[0].score:.2f}") return True return False except Exception as e: logger.error(f"Error checking duplicate: {e}") return False def save_message_to_vector_db(text: str, metadata: Dict): """Сохранение сообщения в векторную базу""" embedding = get_text_embedding(text) if not embedding: return try: point_id = hash(text) % (263) # Простой ID на основе хеша point = PointStruct( id=point_id, vector=embedding, payload={ "text": text, "timestamp": metadata.get("timestamp", datetime.now().isoformat()), "source": metadata.get("source", "unknown"), "message_id": metadata.get("message_id") } ) qdrant_client.upsert( collection_name=COLLECTION_NAME, points=[point] ) except Exception as e: logger.error(f"Error saving to vector DB: {e}") async def classify_topic(text: str) -> str: """Классификация темы сообщения с помощью LLM""" if not openai_client: return "other" try: prompt = f"""Определи основную тему следующего сообщения из Telegram-канала. Выбери одну из категорий: IT, финансы, криптовалюты, политика, наука, технологии, бизнес, другое. Сообщение: {text[:500]} Верни только название категории, без дополнительных объяснений.""" response = openai_client.chat.completions.create( model=LLM_MODEL, messages=[ {"role": "system", "content": "Ты эксперт по классификации новостей. Отвечай только названием категории."}, {"role": "user", "content": prompt} ], temperature=0.3, max_tokens=20 ) topic = response.choices[0].message.content.strip().lower() return topic if topic in ["it", "финансы", "криптовалюты", "политика", "наука", "технологии", "бизнес", "другое"] else "другое" except Exception as e: logger.error(f"Error classifying topic: {e}") return "другое" async def summarize_topic(news_list: List[Dict]) -> str: """Создание краткого резюме для группы новостей""" if not openai_client or not news_list: return "" try: # Формируем список новостей для суммирования news_texts = "\n".join([f"- {news['text'][:200]}" for news in news_list[:10]]) # Ограничиваем до 10 новостей prompt = f"""Создай краткое резюме следующих новостей на русском языке. Резюме должно быть информативным, но кратким (2-3 предложения). Новости: {news_texts} Резюме:""" response = openai_client.chat.completions.create( model=LLM_MODEL, messages=[ {"role": "system", "content": "Ты опытный журналист, создающий краткие дайджесты новостей."}, {"role": "user", "content": prompt} ], temperature=0.5, max_tokens=200 ) return response.choices[0].message.content.strip() except Exception as e: logger.error(f"Error summarizing topic: {e}") return "Не удалось создать резюме" async def create_digest() -> str: """Создание дайджеста из всех новостей в буфере""" if not news_buffer: return "Нет новостей для дайджеста" digest_parts = [] digest_parts.append("📰 Дайджест новостей") digest_parts.append(f"🕐 {datetime.now().strftime('%d.%m.%Y %H:%M')}") digest_parts.append("") # Группируем по темам for topic, news_list in sorted(news_buffer.items()): if not news_list: continue # Определяем эмодзи для темы topic_emojis = { "it": "💻", "финансы": "💰", "криптовалюты": "₿", "политика": "🏛️", "наука": "🔬", "технологии": "⚙️", "бизнес": "📊", "другое": "📌" } emoji = topic_emojis.get(topic, "📌") digest_parts.append(f"{emoji} {topic.upper()} ({len(news_list)} новостей)") # Добавляем резюме summary = await summarize_topic(news_list) if summary: digest_parts.append(f"📝 {summary}") digest_parts.append("") # Добавляем список ключевых новостей (первые 3) digest_parts.append("Ключевые новости:") for news in news_list[:3]: text = news['text'][:150] + "..." if len(news['text']) > 150 else news['text'] source = news.get('source', 'unknown') digest_parts.append(f"• {text} (источник: {source})") digest_parts.append("") digest_parts.append("---") digest_parts.append("") # Добавляем статистику total_news = sum(len(news_list) for news_list in news_buffer.values()) digest_parts.append(f"📊 Всего обработано: {total_news} новостей") digest_parts.append(f"📁 Групп по темам: {len(news_buffer)}") return "\n".join(digest_parts) async def publish_digest(): """Публикация дайджеста в целевой канал""" if not telegram_client or not DEST_CHANNEL: logger.warning("Telegram client or destination channel not set") return try: digest_text = await create_digest() if digest_text == "Нет новостей для дайджеста": logger.info("No news to publish") return await telegram_client.send_message(DEST_CHANNEL, digest_text) logger.info(f"Digest published to {DEST_CHANNEL}") # Очищаем буфер после публикации news_buffer.clear() news_processed_ids.clear() except Exception as e: logger.error(f"Error publishing digest: {e}") @telegram_client.on(events.NewMessage(chats=SOURCE_CHANNELS)) async def handle_new_message(event): """Обработчик новых сообщений из каналов""" message: Message = event.message # Пропускаем служебные сообщения if not message.text or len(message.text.strip()) < 10: return text = message.text.strip() message_id = f"{event.chat_id}_{message.id}" # Проверяем, не обрабатывали ли мы уже это сообщение if message_id in news_processed_ids: return # Проверяем на дубликаты if is_duplicate(text): logger.info(f"Duplicate message filtered: {text[:50]}...") return # Сохраняем в векторную базу save_message_to_vector_db(text, { "timestamp": message.date.isoformat(), "source": event.chat.username or str(event.chat_id), "message_id": message.id }) # Классифицируем тему topic = await classify_topic(text) # Добавляем в буфер news_buffer[topic].append({ "text": text, "source": event.chat.username or str(event.chat_id), "timestamp": message.date.isoformat(), "message_id": message.id }) news_processed_ids.add(message_id) logger.info(f"Processed message: {text[:50]}... (topic: {topic})") async def schedule_publisher(): """Планировщик публикации дайджестов""" # Настраиваем расписание в зависимости от PUBLISH_SCHEDULE if PUBLISH_SCHEDULE == "hourly": schedule.every().hour.do(lambda: asyncio.create_task(publish_digest())) elif PUBLISH_SCHEDULE == "daily": schedule.every().day.at("09:00").do(lambda: asyncio.create_task(publish_digest())) elif PUBLISH_SCHEDULE == "weekly": schedule.every().monday.at("09:00").do(lambda: asyncio.create_task(publish_digest())) # Запускаем планировщик в отдельном потоке while True: schedule.run_pending() await asyncio.sleep(60) # Проверяем каждую минуту async def main(): """Главная функция""" await init_clients() logger.info("Starting Telegram digest bot...") logger.info(f"Monitoring channels: {SOURCE_CHANNELS}") logger.info(f"Destination channel: {DEST_CHANNEL}") logger.info(f"Publish schedule: {PUBLISH_SCHEDULE}") # Запускаем планировщик в фоне asyncio.create_task(schedule_publisher()) # Запускаем мониторинг Telegram await telegram_client.run_until_disconnected() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Bot stopped by user") except Exception as e: logger.exception(f"Fatal error: {e}")

Шаг 4: Запуск системы

    • Заполните .env файл со всеми необходимыми переменными
    • Запустите бота:
python telegram_digest_bot.py
    • При первой запуске введите код из Telegram для авторизации
    • Система начнет мониторинг и автоматически создавать дайджесты

Пример результата

Вот как будет выглядеть дайджест в канале:
📰 Дайджест новостей
🕐 15.01.2025 14:00

💻 IT (5 новостей)
📝 OpenAI представил новую модель GPT-5 с улучшенными возможностями. Компания также анонсировала снижение цен на API для разработчиков.

Ключевые новости:
• OpenAI представил GPT-5 с улучшенной производительностью...
• Microsoft инвестирует $10 млрд в развитие искусственного интеллекта...
• Google объявил о запуске новой версии Bard...


💰 ФИНАНСЫ (3 новости) 📝 Центральный банк объявил о новых мерах по регулированию криптовалют. Эксперты прогнозируют рост курса доллара. Ключевые новости: • ЦБ РФ ужесточил требования к криптовалютным операциям... • Акции крупных IT-компаний выросли на 5%...
📊 Всего обработано: 8 новостей 📁 Групп по темам: 2

Дополнительные возможности

Настройка фильтрации

Вы можете добавить фильтрацию по ключевым словам перед обработкой:
KEYWORDS_FILTER = ["новости", "анонс", "релиз", "обновление"]  # Или другие ключевые слова

def should_process_message(text: str) -> bool:
    """Проверка, нужно ли обрабатывать сообщение"""
    text_lower = text.lower()
    return any(keyword in text_lower for keyword in KEYWORDS_FILTER)

Экспорт в другие форматы

Можно добавить экспорт дайджестов в другие форматы:
async def export_to_json():
    """Экспорт дайджеста в JSON"""
    digest_data = {
        "timestamp": datetime.now().isoformat(),
        "topics": {
            topic: [
                {
                    "text": news["text"],
                    "source": news["source"],
                    "timestamp": news["timestamp"]
                }
                for news in news_list
            ]
            for topic, news_list in news_buffer.items()
        }
    }
    
    with open(f"digest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", "w", encoding="utf-8") as f:
        json.dump(digest_data, f, ensure_ascii=False, indent=2)

Оптимизация и масштабирование

Для больших объемов данных

    • Используйте батчинг — обрабатывайте сообщения группами
    • Кэшируйте эмбеддинги — сохраняйте уже вычисленные векторы
    • Используйте более дешевые модели для первичной фильтрации
    • Добавьте очередь сообщений (Redis, RabbitMQ) для обработки

Мониторинг и логирование

Добавьте метрики для отслеживания работы системы:
metrics = {
    "messages_processed": 0,
    "duplicates_filtered": 0,
    "digests_published": 0,
    "errors": 0
}

Заключение

Мы создали систему автоматического создания AI-дайджестов из Telegram-каналов, которая:
    • ✅ Собирает новости из множества источников
    • ✅ Убирает дубликаты с помощью векторного поиска
    • ✅ Группирует новости по темам с помощью AI
    • ✅ Создает краткие резюме для каждой темы
    • ✅ Публикует структурированные дайджесты по расписанию
Такая система поможет превратить хаотичный поток информации в структурированные, легко читаемые сводки, экономя время и позволяя не пропустить важные новости.
Полезные ресурсы:

572 просмотров
0 лайков
0 комментариев