Автоматическое создание AI-дайджестов из Telegram-каналов
В современном мире информации мы сталкиваемся с огромным потоком новостей из различных Telegram-каналов. Каждый день приходят сотни сообщений, и следить за всеми становится практически невозможно. В этой статье мы создадим систему автоматического создания интеллектуальных дайджестов новостей с группировкой по темам и автоматической публикацией в канале.Зачем это нужно?
Проблема информационного шума
- Слишком много источников — подписка на 20-30 каналов приводит к тысячам сообщений в день
- Дублирование информации — одна новость пересказывается в разных каналах
- Нет структурированности — все новости смешаны без группировки по темам
- Трудно найти важное — ключевая информация теряется в потоке
Решение: AI-дайджесты
Мы создадим систему, которая:- Собирает новости из множества Telegram-каналов
- Группирует по темам используя AI-классификацию
- Убирает дубликаты с помощью векторного поиска
- Создает краткие резюме каждой темы с помощью LLM
- Публикует дайджест в целевой канал по расписанию
Что мы создадим?
Задача системы:- Мониторинг новостей из нескольких Telegram-каналов в реальном времени
- Автоматическая группировка новостей по темам с помощью AI
- Создание кратких резюме для каждой темы
- Удаление дубликатов и похожих сообщений
- Автоматическая публикация дайджестов в целевой канал по расписанию (например, каждый час или раз в день)
Архитектура решения
Наша система состоит из следующих компонентов:- Telegram-монитор — собирает сообщения из каналов
- Векторная база данных — для поиска дубликатов и похожих сообщений
- AI-классификатор — группирует новости по темам
- LLM-суммаризатор — создает краткие резюме
- Планировщик публикаций — публикует дайджесты по расписанию
Шаг 1: Установка зависимостей
pip install telethon qdrant-client openai httpx python-dotenv schedule aiofilesШаг 2: Настройка переменных окружения
Создайте файл.env:
# Telegram API (получить на https://my.telegram.org/)
API_ID=12345678
API_HASH=your_api_hash_here
OpenAI для эмбеддингов и суммирования
OPENAI_API_KEY=your_openai_key
Qdrant для векторного поиска
QDRANT_URL=https://your-qdrant-instance.com
QDRANT_API_KEY=your_qdrant_key
Список каналов для мониторинга (через запятую)
SOURCE_CHANNELS=@channel1,@channel2,@channel3
Целевой канал для публикации дайджестов
DEST_CHANNEL=@your_digest_channel
Настройки
COLLECTION_NAME=telegram_digest
SIMILARITY_THRESHOLD=0.85
EMBEDDING_MODEL=text-embedding-3-small
LLM_MODEL=gpt-4o-mini
Расписание публикации (cron format или простое время)
PUBLISH_SCHEDULE=hourly # hourly, daily, weeklyШаг 3: Создание основной системы
Создайте файлtelegram_digest_bot.py:
import asyncio
import os
import json
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from collections import defaultdict
import schedule
import time
from telethon import TelegramClient, events
from telethon.tl.types import Message
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from openai import OpenAI
import httpx
from dotenv import load_dotenv
Загружаем переменные окружения
load_dotenv()
Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
Переменные окружения
API_ID = int(os.getenv("API_ID", "0"))
API_HASH = os.getenv("API_HASH")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
QDRANT_URL = os.getenv("QDRANT_URL")
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY")
SOURCE_CHANNELS = [ch.strip() for ch in os.getenv("SOURCE_CHANNELS", "").split(",") if ch.strip()]
DEST_CHANNEL = os.getenv("DEST_CHANNEL")
COLLECTION_NAME = os.getenv("COLLECTION_NAME", "telegram_digest")
SIMILARITY_THRESHOLD = float(os.getenv("SIMILARITY_THRESHOLD", "0.85"))
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small")
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini")
PUBLISH_SCHEDULE = os.getenv("PUBLISH_SCHEDULE", "hourly")
Глобальные клиенты
telegram_client = None
qdrant_client = None
openai_client = None
Хранилище новостей для текущего периода
news_buffer: Dict[str, List[Dict]] = defaultdict(list)
news_processed_ids = set()
async def init_clients():
"""Инициализация всех клиентов"""
global telegram_client, qdrant_client, openai_client
# Telegram клиент
telegram_client = TelegramClient('digest_bot_session', API_ID, API_HASH)
await telegram_client.start()
logger.info("Telegram client initialized")
# Qdrant клиент
qdrant_client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY)
# Создаем коллекцию, если её нет
try:
collections = qdrant_client.get_collections().collections
collection_names = [c.name for c in collections]
if COLLECTION_NAME not in collection_names:
qdrant_client.create_collection(
collection_name=COLLECTION_NAME,
vectors_config=VectorParams(
size=1536, # Размерность для text-embedding-3-small
distance=Distance.COSINE
)
)
logger.info(f"Created collection {COLLECTION_NAME}")
else:
logger.info(f"Collection {COLLECTION_NAME} already exists")
except Exception as e:
logger.error(f"Error working with Qdrant: {e}")
raise
# OpenAI клиент
if OPENAI_API_KEY:
openai_client = OpenAI(api_key=OPENAI_API_KEY)
logger.info("OpenAI client initialized")
else:
logger.warning("OPENAI_API_KEY not set")
def get_text_embedding(text: str) -> Optional[List[float]]:
"""Получение векторного представления текста"""
if not openai_client:
return None
try:
response = openai_client.embeddings.create(
model=EMBEDDING_MODEL,
input=text
)
return response.data[0].embedding
except Exception as e:
logger.error(f"Error getting embedding: {e}")
return None
def is_duplicate(text: str, threshold: float = SIMILARITY_THRESHOLD) -> bool:
"""Проверка, является ли сообщение дубликатом"""
embedding = get_text_embedding(text)
if not embedding:
return False
try:
# Поиск похожих векторов
search_results = qdrant_client.search(
collection_name=COLLECTION_NAME,
query_vector=embedding,
limit=1,
score_threshold=threshold
)
if search_results and search_results[0].score >= threshold:
logger.info(f"Found duplicate with score {search_results[0].score:.2f}")
return True
return False
except Exception as e:
logger.error(f"Error checking duplicate: {e}")
return False
def save_message_to_vector_db(text: str, metadata: Dict):
"""Сохранение сообщения в векторную базу"""
embedding = get_text_embedding(text)
if not embedding:
return
try:
point_id = hash(text) % (263) # Простой ID на основе хеша
point = PointStruct(
id=point_id,
vector=embedding,
payload={
"text": text,
"timestamp": metadata.get("timestamp", datetime.now().isoformat()),
"source": metadata.get("source", "unknown"),
"message_id": metadata.get("message_id")
}
)
qdrant_client.upsert(
collection_name=COLLECTION_NAME,
points=[point]
)
except Exception as e:
logger.error(f"Error saving to vector DB: {e}")
async def classify_topic(text: str) -> str:
"""Классификация темы сообщения с помощью LLM"""
if not openai_client:
return "other"
try:
prompt = f"""Определи основную тему следующего сообщения из Telegram-канала.
Выбери одну из категорий: IT, финансы, криптовалюты, политика, наука, технологии, бизнес, другое.
Сообщение: {text[:500]}
Верни только название категории, без дополнительных объяснений."""
response = openai_client.chat.completions.create(
model=LLM_MODEL,
messages=[
{"role": "system", "content": "Ты эксперт по классификации новостей. Отвечай только названием категории."},
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=20
)
topic = response.choices[0].message.content.strip().lower()
return topic if topic in ["it", "финансы", "криптовалюты", "политика", "наука", "технологии", "бизнес", "другое"] else "другое"
except Exception as e:
logger.error(f"Error classifying topic: {e}")
return "другое"
async def summarize_topic(news_list: List[Dict]) -> str:
"""Создание краткого резюме для группы новостей"""
if not openai_client or not news_list:
return ""
try:
# Формируем список новостей для суммирования
news_texts = "\n".join([f"- {news['text'][:200]}" for news in news_list[:10]]) # Ограничиваем до 10 новостей
prompt = f"""Создай краткое резюме следующих новостей на русском языке.
Резюме должно быть информативным, но кратким (2-3 предложения).
Новости:
{news_texts}
Резюме:"""
response = openai_client.chat.completions.create(
model=LLM_MODEL,
messages=[
{"role": "system", "content": "Ты опытный журналист, создающий краткие дайджесты новостей."},
{"role": "user", "content": prompt}
],
temperature=0.5,
max_tokens=200
)
return response.choices[0].message.content.strip()
except Exception as e:
logger.error(f"Error summarizing topic: {e}")
return "Не удалось создать резюме"
async def create_digest() -> str:
"""Создание дайджеста из всех новостей в буфере"""
if not news_buffer:
return "Нет новостей для дайджеста"
digest_parts = []
digest_parts.append("📰 Дайджест новостей")
digest_parts.append(f"🕐 {datetime.now().strftime('%d.%m.%Y %H:%M')}")
digest_parts.append("")
# Группируем по темам
for topic, news_list in sorted(news_buffer.items()):
if not news_list:
continue
# Определяем эмодзи для темы
topic_emojis = {
"it": "💻",
"финансы": "💰",
"криптовалюты": "₿",
"политика": "🏛️",
"наука": "🔬",
"технологии": "⚙️",
"бизнес": "📊",
"другое": "📌"
}
emoji = topic_emojis.get(topic, "📌")
digest_parts.append(f"{emoji} {topic.upper()} ({len(news_list)} новостей)")
# Добавляем резюме
summary = await summarize_topic(news_list)
if summary:
digest_parts.append(f"📝 {summary}")
digest_parts.append("")
# Добавляем список ключевых новостей (первые 3)
digest_parts.append("Ключевые новости:")
for news in news_list[:3]:
text = news['text'][:150] + "..." if len(news['text']) > 150 else news['text']
source = news.get('source', 'unknown')
digest_parts.append(f"• {text} (источник: {source})")
digest_parts.append("")
digest_parts.append("---")
digest_parts.append("")
# Добавляем статистику
total_news = sum(len(news_list) for news_list in news_buffer.values())
digest_parts.append(f"📊 Всего обработано: {total_news} новостей")
digest_parts.append(f"📁 Групп по темам: {len(news_buffer)}")
return "\n".join(digest_parts)
async def publish_digest():
"""Публикация дайджеста в целевой канал"""
if not telegram_client or not DEST_CHANNEL:
logger.warning("Telegram client or destination channel not set")
return
try:
digest_text = await create_digest()
if digest_text == "Нет новостей для дайджеста":
logger.info("No news to publish")
return
await telegram_client.send_message(DEST_CHANNEL, digest_text)
logger.info(f"Digest published to {DEST_CHANNEL}")
# Очищаем буфер после публикации
news_buffer.clear()
news_processed_ids.clear()
except Exception as e:
logger.error(f"Error publishing digest: {e}")
@telegram_client.on(events.NewMessage(chats=SOURCE_CHANNELS))
async def handle_new_message(event):
"""Обработчик новых сообщений из каналов"""
message: Message = event.message
# Пропускаем служебные сообщения
if not message.text or len(message.text.strip()) < 10:
return
text = message.text.strip()
message_id = f"{event.chat_id}_{message.id}"
# Проверяем, не обрабатывали ли мы уже это сообщение
if message_id in news_processed_ids:
return
# Проверяем на дубликаты
if is_duplicate(text):
logger.info(f"Duplicate message filtered: {text[:50]}...")
return
# Сохраняем в векторную базу
save_message_to_vector_db(text, {
"timestamp": message.date.isoformat(),
"source": event.chat.username or str(event.chat_id),
"message_id": message.id
})
# Классифицируем тему
topic = await classify_topic(text)
# Добавляем в буфер
news_buffer[topic].append({
"text": text,
"source": event.chat.username or str(event.chat_id),
"timestamp": message.date.isoformat(),
"message_id": message.id
})
news_processed_ids.add(message_id)
logger.info(f"Processed message: {text[:50]}... (topic: {topic})")
async def schedule_publisher():
"""Планировщик публикации дайджестов"""
# Настраиваем расписание в зависимости от PUBLISH_SCHEDULE
if PUBLISH_SCHEDULE == "hourly":
schedule.every().hour.do(lambda: asyncio.create_task(publish_digest()))
elif PUBLISH_SCHEDULE == "daily":
schedule.every().day.at("09:00").do(lambda: asyncio.create_task(publish_digest()))
elif PUBLISH_SCHEDULE == "weekly":
schedule.every().monday.at("09:00").do(lambda: asyncio.create_task(publish_digest()))
# Запускаем планировщик в отдельном потоке
while True:
schedule.run_pending()
await asyncio.sleep(60) # Проверяем каждую минуту
async def main():
"""Главная функция"""
await init_clients()
logger.info("Starting Telegram digest bot...")
logger.info(f"Monitoring channels: {SOURCE_CHANNELS}")
logger.info(f"Destination channel: {DEST_CHANNEL}")
logger.info(f"Publish schedule: {PUBLISH_SCHEDULE}")
# Запускаем планировщик в фоне
asyncio.create_task(schedule_publisher())
# Запускаем мониторинг Telegram
await telegram_client.run_until_disconnected()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Bot stopped by user")
except Exception as e:
logger.exception(f"Fatal error: {e}")Шаг 4: Запуск системы
- Заполните
.envфайл со всеми необходимыми переменными
- Запустите бота:
python telegram_digest_bot.py- При первой запуске введите код из Telegram для авторизации
- Система начнет мониторинг и автоматически создавать дайджесты
Пример результата
Вот как будет выглядеть дайджест в канале:📰 Дайджест новостей
🕐 15.01.2025 14:00
💻 IT (5 новостей)
📝 OpenAI представил новую модель GPT-5 с улучшенными возможностями. Компания также анонсировала снижение цен на API для разработчиков.
Ключевые новости:
• OpenAI представил GPT-5 с улучшенной производительностью...
• Microsoft инвестирует $10 млрд в развитие искусственного интеллекта...
• Google объявил о запуске новой версии Bard...
💰 ФИНАНСЫ (3 новости)
📝 Центральный банк объявил о новых мерах по регулированию криптовалют. Эксперты прогнозируют рост курса доллара.
Ключевые новости:
• ЦБ РФ ужесточил требования к криптовалютным операциям...
• Акции крупных IT-компаний выросли на 5%...
📊 Всего обработано: 8 новостей
📁 Групп по темам: 2Дополнительные возможности
Настройка фильтрации
Вы можете добавить фильтрацию по ключевым словам перед обработкой:KEYWORDS_FILTER = ["новости", "анонс", "релиз", "обновление"] # Или другие ключевые слова
def should_process_message(text: str) -> bool:
"""Проверка, нужно ли обрабатывать сообщение"""
text_lower = text.lower()
return any(keyword in text_lower for keyword in KEYWORDS_FILTER)Экспорт в другие форматы
Можно добавить экспорт дайджестов в другие форматы:async def export_to_json():
"""Экспорт дайджеста в JSON"""
digest_data = {
"timestamp": datetime.now().isoformat(),
"topics": {
topic: [
{
"text": news["text"],
"source": news["source"],
"timestamp": news["timestamp"]
}
for news in news_list
]
for topic, news_list in news_buffer.items()
}
}
with open(f"digest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", "w", encoding="utf-8") as f:
json.dump(digest_data, f, ensure_ascii=False, indent=2)Оптимизация и масштабирование
Для больших объемов данных
- Используйте батчинг — обрабатывайте сообщения группами
- Кэшируйте эмбеддинги — сохраняйте уже вычисленные векторы
- Используйте более дешевые модели для первичной фильтрации
- Добавьте очередь сообщений (Redis, RabbitMQ) для обработки
Мониторинг и логирование
Добавьте метрики для отслеживания работы системы:metrics = {
"messages_processed": 0,
"duplicates_filtered": 0,
"digests_published": 0,
"errors": 0
}Заключение
Мы создали систему автоматического создания AI-дайджестов из Telegram-каналов, которая:- ✅ Собирает новости из множества источников
- ✅ Убирает дубликаты с помощью векторного поиска
- ✅ Группирует новости по темам с помощью AI
- ✅ Создает краткие резюме для каждой темы
- ✅ Публикует структурированные дайджесты по расписанию
Полезные ресурсы:
572 просмотров
0 лайков
0 комментариев
Комментарии (0)
Пока нет комментариев. Будьте первым!