Telegram-бот с GPT: от идеи до продакшена за неделю

AI и LLM
Telegram-бот с GPT: от идеи до продакшена за неделю

TL;DR: За неделю можно собрать полноценный Telegram-бот с GPT — от базового MVP до продакшен-готового решения. Главное — правильная архитектура, асинхронность и грамотная обработка ошибок. Покажу конкретный стек и подводные камни.

Архитектура: что нужно для старта

Минимальный стек для Telegram-бота с GPT выглядит просто:

  • Python 3.9+ с aiogram для Telegram API
  • OpenAI API или другой LLM-провайдер
  • PostgreSQL для хранения контекста
  • Redis для кеширования и rate limiting
  • Docker для деплоя

Типовая архитектура включает три основных компонента:

  1. Bot Handler — обрабатывает сообщения от Telegram
  2. LLM Service — работает с GPT API
  3. Context Manager — управляет историей диалогов
# Базовая структура бота
from aiogram import Bot, Dispatcher, types
from aiogram.contrib.fsm_storage.redis import RedisStorage2
import openai

class ChatBot:
    def __init__(self):
        self.bot = Bot(token=TELEGRAM_TOKEN)
        self.storage = RedisStorage2('redis://localhost')
        self.dp = Dispatcher(self.bot, storage=self.storage)
        
    async def process_message(self, message: types.Message):
        # Получаем контекст
        context = await self.get_context(message.from_user.id)
        
        # Отправляем в GPT
        response = await self.get_gpt_response(message.text, context)
        
        # Сохраняем и отвечаем
        await self.save_context(message.from_user.id, message.text, response)
        await message.reply(response)

День 1-2: MVP и базовая функциональность

Первые два дня уходят на создание базового функционала. Главное — не переусложнять.

Обработка сообщений

@dp.message_handler()
async def handle_message(message: types.Message):
    try:
        # Показываем typing
        await bot.send_chat_action(message.chat.id, 'typing')
        
        # Базовая валидация
        if len(message.text) > 4000:
            await message.reply("Сообщение слишком длинное")
            return
            
        # Получаем ответ от GPT
        response = await get_gpt_response(message.text)
        await message.reply(response)
        
    except Exception as e:
        logger.error(f"Error processing message: {e}")
        await message.reply("Произошла ошибка, попробуйте позже")

Интеграция с OpenAI

async def get_gpt_response(user_message: str, context: list = None):
    messages = [
        {"role": "system", "content": "Ты полезный помощник"}
    ]
    
    if context:
        messages.extend(context)
    
    messages.append({"role": "user", "content": user_message})
    
    response = await openai.ChatCompletion.acreate(
        model="gpt-3.5-turbo",
        messages=messages,
        max_tokens=1000,
        temperature=0.7
    )
    
    return response.choices[0].message.content

День 3-4: контекст и персистентность

На этом этапе добавляем память бота — без неё диалог получается рваным.

Управление контекстом

class ContextManager:
    def __init__(self, redis_client, max_context_length=10):
        self.redis = redis_client
        self.max_length = max_context_length
    
    async def get_context(self, user_id: int) -> list:
        context_key = f"context:{user_id}"
        context_data = await self.redis.get(context_key)
        
        if context_data:
            return json.loads(context_data)
        return []
    
    async def add_message(self, user_id: int, role: str, content: str):
        context = await self.get_context(user_id)
        context.append({"role": role, "content": content})
        
        # Ограничиваем длину контекста
        if len(context) > self.max_length:
            context = context[-self.max_length:]
        
        context_key = f"context:{user_id}"
        await self.redis.setex(
            context_key, 
            3600,  # TTL 1 час
            json.dumps(context)
        )

База данных для долгосрочного хранения

# models.py
from sqlalchemy import Column, Integer, Text, DateTime, BigInteger
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class ChatMessage(Base):
    __tablename__ = 'chat_messages'
    
    id = Column(Integer, primary_key=True)
    user_id = Column(BigInteger, nullable=False)
    message_text = Column(Text, nullable=False)
    bot_response = Column(Text, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)

День 5-6: продакшен-готовность

Здесь фокус на надёжности и производительности.

Rate limiting и защита от спама

from aiogram.dispatcher.middlewares import BaseMiddleware
from aiogram.utils.exceptions import Throttled

class ThrottlingMiddleware(BaseMiddleware):
    def __init__(self, limit=1, key_prefix='antiflood_'):
        self.rate_limit = limit
        self.prefix = key_prefix
        super().__init__()

    async def on_process_message(self, message: types.Message, data: dict):
        dispatcher = Dispatcher.get_current()
        
        try:
            await dispatcher.throttle(
                self.prefix + str(message.from_user.id), 
                rate=self.rate_limit
            )
        except Throttled:
            await message.reply("Не так быстро! Подождите немного.")
            raise

Обработка ошибок и мониторинг

import logging
from functools import wraps

def handle_errors(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        try:
            return await func(*args, **kwargs)
        except openai.RateLimitError:
            await args[0].reply("Превышен лимит запросов, попробуйте позже")
        except openai.InvalidRequestError as e:
            logger.error(f"Invalid OpenAI request: {e}")
            await args[0].reply("Некорректный запрос")
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            await args[0].reply("Произошла ошибка")
    return wrapper

@handle_errors
async def process_gpt_request(message: types.Message):
    # Основная логика
    pass

День 7: деплой и масштабирование

Docker-контейнеризация

FROM python:3.9-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["python", "bot.py"]

docker-compose для локальной разработки

version: '3.8'
services:
  bot:
    build: .
    environment:
      - TELEGRAM_TOKEN=${TELEGRAM_TOKEN}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis
      - postgres

  redis:
    image: redis:alpine
    
  postgres:
    image: postgres:13
    environment:
      POSTGRES_DB: botdb
      POSTGRES_USER: botuser
      POSTGRES_PASSWORD: botpass

Подводные камни и оптимизации

Управление токенами

GPT API считает токены, а не символы. Для контроля расходов нужно отслеживать потребление:

import tiktoken

def count_tokens(text: str, model: str = "gpt-3.5-turbo") -> int:
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

async def check_token_limit(messages: list, max_tokens: int = 3000):
    total_tokens = sum(count_tokens(msg["content"]) for msg in messages)
    
    if total_tokens > max_tokens:
        # Обрезаем старые сообщения
        while total_tokens > max_tokens and len(messages) > 1:
            messages.pop(0)
            total_tokens = sum(count_tokens(msg["content"]) for msg in messages)
    
    return messages

Асинхронная обработка

Для высоконагруженных ботов стоит использовать очереди:

import asyncio
from asyncio import Queue

class MessageProcessor:
    def __init__(self):
        self.queue = Queue()
        self.workers = []
    
    async def start_workers(self, worker_count=3):
        for i in range(worker_count):
            worker = asyncio.create_task(self.worker())
            self.workers.append(worker)
    
    async def worker(self):
        while True:
            message = await self.queue.get()
            try:
                await self.process_message(message)
            except Exception as e:
                logger.error(f"Worker error: {e}")
            finally:
                self.queue.task_done()

Мониторинг и аналитика

Базовые метрики для отслеживания:

  • Количество активных пользователей
  • Среднее время ответа GPT
  • Количество ошибок API
  • Потребление токенов
import time
from dataclasses import dataclass

@dataclass
class BotMetrics:
    total_messages: int = 0
    total_errors: int = 0
    total_tokens_used: int = 0
    avg_response_time: float = 0.0

metrics = BotMetrics()

async def track_request(func):
    start_time = time.time()
    try:
        result = await func()
        metrics.total_messages += 1
        return result
    except Exception as e:
        metrics.total_errors += 1
        raise
    finally:
        response_time = time.time() - start_time
        metrics.avg_response_time = (
            metrics.avg_response_time * (metrics.total_messages - 1) + response_time
        ) / metrics.total_messages

FAQ

Сколько стоит запустить такого бота? Оценочно: $5-20 в месяц на инфраструктуру + расходы на OpenAI API (зависят от нагрузки). Для MVP с небольшой аудиторией хватит $10-30 в месяц.

Какие альтернативы OpenAI можно использовать? Anthropic Claude, Google PaLM, локальные модели через Ollama или HuggingFace. Главное — адаптировать код под API конкретного провайдера.

Как масштабировать бота на тысячи пользователей? Добавить load balancer, несколько инстансов бота, внешнюю очередь (RabbitMQ/Kafka), кеширование частых запросов и мониторинг производительности.

Нужно ли получать разрешения для коммерческого использования? Проверьте ToS Telegram Bot API и OpenAI. Обычно для обычных ботов ограничений нет, но есть лимиты по частоте запросов.

Как обеспечить безопасность пользовательских данных? Шифруйте чувствительные данные в БД, используйте HTTPS для всех соединений, логируйте минимум персональной информации, регулярно очищайте старые сообщения.

Нужна помощь с разработкой Telegram-бота или интеграцией AI? Напишите мне — обсудим ваш проект.

Обсудить проект

Есть идея или задача? Давайте обсудим, как можно её реализовать с помощью современных AI-технологий.

Написать мне
Вернуться к блогу