TL;DR: За неделю можно собрать полноценный Telegram-бот с GPT — от базового MVP до продакшен-готового решения. Главное — правильная архитектура, асинхронность и грамотная обработка ошибок. Покажу конкретный стек и подводные камни.
Архитектура: что нужно для старта
Минимальный стек для Telegram-бота с GPT выглядит просто:
- Python 3.9+ с aiogram для Telegram API
- OpenAI API или другой LLM-провайдер
- PostgreSQL для хранения контекста
- Redis для кеширования и rate limiting
- Docker для деплоя
Типовая архитектура включает три основных компонента:
- Bot Handler — обрабатывает сообщения от Telegram
- LLM Service — работает с GPT API
- Context Manager — управляет историей диалогов
# Базовая структура бота
from aiogram import Bot, Dispatcher, types
from aiogram.contrib.fsm_storage.redis import RedisStorage2
import openai
class ChatBot:
def __init__(self):
self.bot = Bot(token=TELEGRAM_TOKEN)
self.storage = RedisStorage2('redis://localhost')
self.dp = Dispatcher(self.bot, storage=self.storage)
async def process_message(self, message: types.Message):
# Получаем контекст
context = await self.get_context(message.from_user.id)
# Отправляем в GPT
response = await self.get_gpt_response(message.text, context)
# Сохраняем и отвечаем
await self.save_context(message.from_user.id, message.text, response)
await message.reply(response)
День 1-2: MVP и базовая функциональность
Первые два дня уходят на создание базового функционала. Главное — не переусложнять.
Обработка сообщений
@dp.message_handler()
async def handle_message(message: types.Message):
try:
# Показываем typing
await bot.send_chat_action(message.chat.id, 'typing')
# Базовая валидация
if len(message.text) > 4000:
await message.reply("Сообщение слишком длинное")
return
# Получаем ответ от GPT
response = await get_gpt_response(message.text)
await message.reply(response)
except Exception as e:
logger.error(f"Error processing message: {e}")
await message.reply("Произошла ошибка, попробуйте позже")
Интеграция с OpenAI
async def get_gpt_response(user_message: str, context: list = None):
messages = [
{"role": "system", "content": "Ты полезный помощник"}
]
if context:
messages.extend(context)
messages.append({"role": "user", "content": user_message})
response = await openai.ChatCompletion.acreate(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=1000,
temperature=0.7
)
return response.choices[0].message.content
День 3-4: контекст и персистентность
На этом этапе добавляем память бота — без неё диалог получается рваным.
Управление контекстом
class ContextManager:
def __init__(self, redis_client, max_context_length=10):
self.redis = redis_client
self.max_length = max_context_length
async def get_context(self, user_id: int) -> list:
context_key = f"context:{user_id}"
context_data = await self.redis.get(context_key)
if context_data:
return json.loads(context_data)
return []
async def add_message(self, user_id: int, role: str, content: str):
context = await self.get_context(user_id)
context.append({"role": role, "content": content})
# Ограничиваем длину контекста
if len(context) > self.max_length:
context = context[-self.max_length:]
context_key = f"context:{user_id}"
await self.redis.setex(
context_key,
3600, # TTL 1 час
json.dumps(context)
)
База данных для долгосрочного хранения
# models.py
from sqlalchemy import Column, Integer, Text, DateTime, BigInteger
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class ChatMessage(Base):
__tablename__ = 'chat_messages'
id = Column(Integer, primary_key=True)
user_id = Column(BigInteger, nullable=False)
message_text = Column(Text, nullable=False)
bot_response = Column(Text, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
День 5-6: продакшен-готовность
Здесь фокус на надёжности и производительности.
Rate limiting и защита от спама
from aiogram.dispatcher.middlewares import BaseMiddleware
from aiogram.utils.exceptions import Throttled
class ThrottlingMiddleware(BaseMiddleware):
def __init__(self, limit=1, key_prefix='antiflood_'):
self.rate_limit = limit
self.prefix = key_prefix
super().__init__()
async def on_process_message(self, message: types.Message, data: dict):
dispatcher = Dispatcher.get_current()
try:
await dispatcher.throttle(
self.prefix + str(message.from_user.id),
rate=self.rate_limit
)
except Throttled:
await message.reply("Не так быстро! Подождите немного.")
raise
Обработка ошибок и мониторинг
import logging
from functools import wraps
def handle_errors(func):
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except openai.RateLimitError:
await args[0].reply("Превышен лимит запросов, попробуйте позже")
except openai.InvalidRequestError as e:
logger.error(f"Invalid OpenAI request: {e}")
await args[0].reply("Некорректный запрос")
except Exception as e:
logger.error(f"Unexpected error: {e}")
await args[0].reply("Произошла ошибка")
return wrapper
@handle_errors
async def process_gpt_request(message: types.Message):
# Основная логика
pass
День 7: деплой и масштабирование
Docker-контейнеризация
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "bot.py"]
docker-compose для локальной разработки
version: '3.8'
services:
bot:
build: .
environment:
- TELEGRAM_TOKEN=${TELEGRAM_TOKEN}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
- postgres
redis:
image: redis:alpine
postgres:
image: postgres:13
environment:
POSTGRES_DB: botdb
POSTGRES_USER: botuser
POSTGRES_PASSWORD: botpass
Подводные камни и оптимизации
Управление токенами
GPT API считает токены, а не символы. Для контроля расходов нужно отслеживать потребление:
import tiktoken
def count_tokens(text: str, model: str = "gpt-3.5-turbo") -> int:
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
async def check_token_limit(messages: list, max_tokens: int = 3000):
total_tokens = sum(count_tokens(msg["content"]) for msg in messages)
if total_tokens > max_tokens:
# Обрезаем старые сообщения
while total_tokens > max_tokens and len(messages) > 1:
messages.pop(0)
total_tokens = sum(count_tokens(msg["content"]) for msg in messages)
return messages
Асинхронная обработка
Для высоконагруженных ботов стоит использовать очереди:
import asyncio
from asyncio import Queue
class MessageProcessor:
def __init__(self):
self.queue = Queue()
self.workers = []
async def start_workers(self, worker_count=3):
for i in range(worker_count):
worker = asyncio.create_task(self.worker())
self.workers.append(worker)
async def worker(self):
while True:
message = await self.queue.get()
try:
await self.process_message(message)
except Exception as e:
logger.error(f"Worker error: {e}")
finally:
self.queue.task_done()
Мониторинг и аналитика
Базовые метрики для отслеживания:
- Количество активных пользователей
- Среднее время ответа GPT
- Количество ошибок API
- Потребление токенов
import time
from dataclasses import dataclass
@dataclass
class BotMetrics:
total_messages: int = 0
total_errors: int = 0
total_tokens_used: int = 0
avg_response_time: float = 0.0
metrics = BotMetrics()
async def track_request(func):
start_time = time.time()
try:
result = await func()
metrics.total_messages += 1
return result
except Exception as e:
metrics.total_errors += 1
raise
finally:
response_time = time.time() - start_time
metrics.avg_response_time = (
metrics.avg_response_time * (metrics.total_messages - 1) + response_time
) / metrics.total_messages
FAQ
Сколько стоит запустить такого бота? Оценочно: $5-20 в месяц на инфраструктуру + расходы на OpenAI API (зависят от нагрузки). Для MVP с небольшой аудиторией хватит $10-30 в месяц.
Какие альтернативы OpenAI можно использовать? Anthropic Claude, Google PaLM, локальные модели через Ollama или HuggingFace. Главное — адаптировать код под API конкретного провайдера.
Как масштабировать бота на тысячи пользователей? Добавить load balancer, несколько инстансов бота, внешнюю очередь (RabbitMQ/Kafka), кеширование частых запросов и мониторинг производительности.
Нужно ли получать разрешения для коммерческого использования? Проверьте ToS Telegram Bot API и OpenAI. Обычно для обычных ботов ограничений нет, но есть лимиты по частоте запросов.
Как обеспечить безопасность пользовательских данных? Шифруйте чувствительные данные в БД, используйте HTTPS для всех соединений, логируйте минимум персональной информации, регулярно очищайте старые сообщения.
Нужна помощь с разработкой Telegram-бота или интеграцией AI? Напишите мне — обсудим ваш проект.