TL;DR
RAG-пайплайн для корпоративных документов — это система, которая позволяет LLM отвечать на вопросы, используя актуальную информацию из ваших внутренних документов. Ключевые этапы: chunking документов, создание эмбеддингов, векторный поиск и генерация ответов. При грамотной настройке retrieval, метаданных и проверки источников такая система снижает ручной поиск и делает ответы LLM проверяемыми.
Зачем нужен RAG для корпоративных документов
Типовая ситуация в компаниях: внутренние регламенты, инструкции, базы знаний и техническая документация лежат в разных системах, а сотрудники тратят время на ручной поиск. Обычные LLM знают только то, на чём обучались, а ваши корпоративные данные им недоступны. RAG решает эту проблему: модель отвечает на основе найденных фрагментов и может показывать источники.
Основные преимущества RAG:
- Актуальная информация без переобучения модели
- Прозрачность источников (знаете, откуда взят ответ)
- Контроль над данными (всё остаётся внутри компании)
- Масштабируемость (легко добавлять новые документы)
Архитектура RAG-пайплайна
Компоненты системы
graph TD
A[Документы] --> B[Preprocessing]
B --> C[Chunking]
C --> D[Embedding Model]
D --> E[Vector DB]
F[User Query] --> G[Query Embedding]
G --> H[Similarity Search]
E --> H
H --> I[Context Retrieval]
I --> J[LLM + Prompt]
J --> K[Response]
Основные этапы:
- Ingestion Pipeline — загрузка и обработка документов
- Embedding Pipeline — создание векторных представлений
- Retrieval Pipeline — поиск релевантных фрагментов
- Generation Pipeline — формирование ответа
Preprocessing и chunking документов
Извлечение текста
В корпоративной среде сталкиваешься с разными форматами. Вот проверенный стек:
import PyPDF2
from docx import Document
import pandas as pd
from bs4 import BeautifulSoup
def extract_text(file_path):
"""Универсальный экстрактор текста"""
if file_path.endswith('.pdf'):
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page in reader.pages:
text += page.extract_text()
return text
elif file_path.endswith('.docx'):
doc = Document(file_path)
return '\n'.join([paragraph.text for paragraph in doc.paragraphs])
elif file_path.endswith('.xlsx'):
df = pd.read_excel(file_path)
return df.to_string()
Стратегии chunking
Фиксированный размер (самый простой):
def fixed_chunk(text, chunk_size=1000, overlap=200):
chunks = []
for i in range(0, len(text), chunk_size - overlap):
chunk = text[i:i + chunk_size]
chunks.append(chunk)
return chunks
Семантический chunking (лучше для качества):
from sentence_transformers import SentenceTransformer
import numpy as np
def semantic_chunk(text, model, similarity_threshold=0.7):
sentences = text.split('.')
embeddings = model.encode(sentences)
chunks = []
current_chunk = [sentences[0]]
for i in range(1, len(sentences)):
similarity = np.dot(embeddings[i-1], embeddings[i])
if similarity > similarity_threshold:
current_chunk.append(sentences[i])
else:
chunks.append('.'.join(current_chunk))
current_chunk = [sentences[i]]
return chunks
Практические рекомендации по размеру чанков:
| Тип документа | Размер чанка | Overlap | Комментарий |
|---|---|---|---|
| Техническая документация | 800-1200 токенов | 100-200 | Сохраняет контекст процедур |
| Юридические документы | 500-800 токенов | 150-250 | Важна точность формулировок |
| FAQ и инструкции | 300-600 токенов | 50-100 | Короткие, самодостаточные блоки |
Выбор модели эмбеддингов
Сравнение популярных моделей
Тестировал разные модели на корпоративных данных одного клиента (техдокументация + регламенты):
| Модель | Размерность | Точность@10 | Скорость | Комментарий |
|---|---|---|---|---|
| text-embedding-ada-002 | 1536 | 78% | Быстро | Универсальная, хорошо для начала |
| sentence-transformers/all-MiniLM-L6-v2 | 384 | 72% | Очень быстро | Компактная, для больших объёмов |
| intfloat/multilingual-e5-large | 1024 | 82% | Средне | Лучшая для русского языка |
| cohere-embed-multilingual-v3.0 | 1024 | 80% | Быстро | Хорошее соотношение качество/цена |
Настройка эмбеддингов
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
class EmbeddingManager:
def __init__(self, model_name="intfloat/multilingual-e5-large"):
self.model = SentenceTransformer(model_name)
self.dimension = self.model.get_sentence_embedding_dimension()
def create_embeddings(self, chunks, batch_size=32):
"""Создание эмбеддингов с батчингом"""
embeddings = []
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
batch_embeddings = self.model.encode(batch, show_progress_bar=True)
embeddings.extend(batch_embeddings)
return np.array(embeddings)
Векторные базы данных
Выбор векторной БД
Для прототипирования:
- FAISS — быстро, просто, но без персистентности
- Chroma — легкий старт, встроенная персистентность
Для продакшена:
- Pinecone — managed решение, отличная производительность
- Weaviate — self-hosted, богатый функционал
- Qdrant — быстрый, хорошо документированный
Пример с Qdrant
from qdrant_client import QdrantClient
from qdrant_client.http import models
class VectorStore:
def __init__(self, collection_name="corporate_docs"):
self.client = QdrantClient("localhost", port=6333)
self.collection_name = collection_name
def create_collection(self, vector_size=1024):
"""Создание коллекции"""
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=models.VectorParams(
size=vector_size,
distance=models.Distance.COSINE
)
)
def upload_documents(self, embeddings, metadata):
"""Загрузка документов с метаданными"""
points = [
models.PointStruct(
id=idx,
vector=embedding.tolist(),
payload=meta
)
for idx, (embedding, meta) in enumerate(zip(embeddings, metadata))
]
self.client.upsert(
collection_name=self.collection_name,
points=points
)
def search(self, query_vector, top_k=5):
"""Поиск похожих документов"""
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
limit=top_k
)
return results
Оптимизация поиска
Гибридный поиск
Комбинируем векторный поиск с классическим BM25:
from rank_bm25 import BM25Okapi
import numpy as np
class HybridRetriever:
def __init__(self, vector_store, documents):
self.vector_store = vector_store
self.documents = documents
# Подготовка BM25
tokenized_docs = [doc.lower().split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
def search(self, query, top_k=10, alpha=0.7):
"""Гибридный поиск с весами"""
# Векторный поиск
query_embedding = self.embedding_model.encode([query])
vector_results = self.vector_store.search(query_embedding[0], top_k=20)
# BM25 поиск
tokenized_query = query.lower().split()
bm25_scores = self.bm25.get_scores(tokenized_query)
# Нормализация и комбинирование
vector_scores = {r.id: r.score for r in vector_results}
combined_scores = {}
for doc_id in range(len(self.documents)):
vector_score = vector_scores.get(doc_id, 0)
bm25_score = bm25_scores[doc_id] if doc_id < len(bm25_scores) else 0
# Нормализация
bm25_score = bm25_score / (max(bm25_scores) + 1e-8)
combined_scores[doc_id] = alpha * vector_score + (1 - alpha) * bm25_score
# Сортировка и возврат топ-k
sorted_results = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)
return sorted_results[:top_k]
Фильтрация по метаданным
def filtered_search(self, query_vector, filters, top_k=5):
"""Поиск с фильтрами по метаданным"""
# Конструируем фильтр Qdrant
filter_conditions = []
if filters.get('document_type'):
filter_conditions.append(
models.FieldCondition(
key="document_type",
match=models.MatchValue(value=filters['document_type'])
)
)
if filters.get('department'):
filter_conditions.append(
models.FieldCondition(
key="department",
match=models.MatchValue(value=filters['department'])
)
)
query_filter = models.Filter(must=filter_conditions) if filter_conditions else None
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
query_filter=query_filter,
limit=top_k
)
return results
Генерация ответов
Промпт-инжиниринг для RAG
def create_rag_prompt(query, context_chunks, max_context_length=3000):
"""Создание промпта с контекстом"""
# Ограничиваем длину контекста
context = ""
current_length = 0
for chunk in context_chunks:
chunk_text = f"Документ: {chunk['metadata']['title']}\n{chunk['text']}\n\n"
if current_length + len(chunk_text) > max_context_length:
break
context += chunk_text
current_length += len(chunk_text)
prompt = f"""
Ты — ассистент по корпоративной документации. Отвечай точно на основе предоставленных документов.
КОНТЕКСТ:
{context}
ВОПРОС: {query}
ИНСТРУКЦИИ:
1. Используй только информацию из предоставленного контекста
2. Если информации недостаточно, так и скажи
3. Указывай источники (названия документов)
4. Отвечай структурированно и понятно
ОТВЕТ:
"""
return prompt
Пост-обработка ответов
def post_process_response(self, response, source_chunks):
"""Добавление ссылок на источники"""
# Извлекаем уникальные источники
sources = set()
for chunk in source_chunks:
if chunk['score'] > 0.7: # Только релевантные источники
sources.add(chunk['metadata']['title'])
# Добавляем источники к ответу
if sources:
sources_text = "\n\nИсточники:\n" + "\n".join([f"• {source}" for source in sources])
response += sources_text
return response
Мониторинг и улучшение качества
Метрики качества
class RAGEvaluator:
def __init__(self):
self.metrics = {
'retrieval_accuracy': [],
'answer_relevance': [],
'source_precision': []
}
def evaluate_retrieval(self, query, retrieved_docs, ground_truth_docs):
"""Оценка качества поиска"""
retrieved_ids = set([doc['id'] for doc in retrieved_docs])
ground_truth_ids = set(ground_truth_docs)
precision = len(retrieved_ids & ground_truth_ids) / len(retrieved_ids)
recall = len(retrieved_ids & ground_truth_ids) / len(ground_truth_ids)
return {
'precision': precision,
'recall': recall,
'f1': 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
}
def log_query(self, query, response, user_feedback=None):
"""Логирование запросов для анализа"""
log_entry = {
'timestamp': datetime.now(),
'query': query,
'response': response,
'feedback': user_feedback
}
# Сохранение в БД или файл для последующего анализа
self.save_log(log_entry)
Практические советы по деплою
Кэширование
import redis
import hashlib
import json
class ResponseCache:
def __init__(self, redis_host='localhost', redis_port=6379, ttl=3600):
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
self.ttl = ttl
def get_cache_key(self, query):
"""Создание ключа кэша"""
return f"rag_response:{hashlib.md5(query.encode()).hexdigest()}"
def get_cached_response(self, query):
"""Получение кэшированного ответа"""
key = self.get_cache_key(query)
cached = self.redis_client.get(key)
return json.loads(cached) if cached else None
def cache_response(self, query, response):
"""Кэширование ответа"""
key = self.get_cache_key(query)
self.redis_client.setex(key, self.ttl, json.dumps(response))
Масштабирование
Для высоких нагрузок:
- Используйте асинхронную обработку (FastAPI + async)
- Балансируйте нагрузку между несколькими инстансами
- Кэшируйте эмбеддинги и популярные запросы
- Рассмотрите GPU для больших моделей эмбеддингов
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.post("/ask")
async def ask_question(query: str):
# Проверяем кэш
cached = cache.get_cached_response(query)
if cached:
return cached
# Асинхронный поиск
search_task = asyncio.create_task(vector_store.search_async(query))
# Параллельная обработка
results = await search_task
# Генерация ответа
response = await generate_response_async(query, results)
# Кэширование
cache.cache_response(query, response)
return response
Типовые сценарии внедрения и ожидаемый эффект
Ниже — не выдуманные “кейсы клиентов”, а практические сценарии, с которыми обычно сталкиваются команды при внедрении RAG. Конкретные цифры зависят от качества документов, модели эмбеддингов, схемы chunking и дисциплины обновления индекса.
Сценарий 1: внутренние регламенты и инструкции
Задача: быстро находить ответы по политикам, процедурам, SLA и внутренним правилам.
Что важно настроить:
- chunking по смысловым секциям, а не только по фиксированному размеру;
- метаданные: подразделение, версия документа, дата обновления, тип регламента;
- обязательные ссылки на источники в каждом ответе;
- fallback: “не найдено достаточно данных”, если retrieval слабый.
Ожидаемый эффект: меньше ручного поиска и меньше ответов “на память”. Важно измерять это через реальные метрики: время поиска, процент ответов с источниками, оценку пользователей и количество эскалаций к экспертам.
Сценарий 2: техническая документация и база знаний разработки
Задача: помочь разработчикам быстрее находить архитектурные решения, API-контракты, runbook’и и правила code review.
Что важно настроить:
- отдельные парсеры для Markdown, Confluence/Jira, OpenAPI и кода;
- фильтрацию по проектам, командам и версиям;
- инкрементальное обновление индекса при изменении документации;
- регулярную проверку качества retrieval на наборе контрольных вопросов.
Ожидаемый эффект: быстрее onboarding, меньше повторных вопросов в чатах и выше переиспользование уже принятых архитектурных решений.
FAQ
Q: Какой размер чанка оптимальный для корпоративных документов? A: Зависит от типа документов. Для большинства случаев 800-1200 токенов с overlap 100-200 работают хорошо. Для коротких FAQ можно 300-500 токенов, для технической документации — до 1500.
Q: Стоит ли файн-тюнить модель эмбеддингов под свои данные? A: В 80% случаев не нужно. Современные multilingual модели хорошо работают out-of-the-box. Файн-тюнинг имеет смысл, если у вас очень специфическая терминология и большой размеченный датасет.
Q: Как обновлять индекс при изменении документов? A: Инкрементально. Ведите версионирование документов, при изменении удаляйте старые чанки и добавляйте новые. Полная переиндексация нужна редко.
Q: Какую векторную БД выбрать для продакшена? A: Если бюджет позволяет — Pinecone (managed). Если нужен self-hosted — Qdrant или Weaviate. Для простых случаев Chroma вполне подойдет.
Q: Как измерить качество RAG-системы? A: Основные метрики: точность поиска (precision@k), релевантность ответов (можно через LLM-as-judge), пользовательская оценка (лайки/дизлайки). Обязательно логируйте все запросы для анализа.
Нужна помощь с построением RAG-пайплайна для ваших корпоративных документов? Напишите мне — обсудим ваш проект.