Как построить RAG-пайплайн для корпоративных документов: практическое руководство

AI и LLM
Как построить RAG-пайплайн для корпоративных документов: практическое руководство

TL;DR

RAG-пайплайн для корпоративных документов — это система, которая позволяет LLM отвечать на вопросы, используя актуальную информацию из ваших внутренних документов. Ключевые этапы: chunking документов, создание эмбеддингов, векторный поиск и генерация ответов. При грамотной настройке retrieval, метаданных и проверки источников такая система снижает ручной поиск и делает ответы LLM проверяемыми.

Зачем нужен RAG для корпоративных документов

Типовая ситуация в компаниях: внутренние регламенты, инструкции, базы знаний и техническая документация лежат в разных системах, а сотрудники тратят время на ручной поиск. Обычные LLM знают только то, на чём обучались, а ваши корпоративные данные им недоступны. RAG решает эту проблему: модель отвечает на основе найденных фрагментов и может показывать источники.

Основные преимущества RAG:

  • Актуальная информация без переобучения модели
  • Прозрачность источников (знаете, откуда взят ответ)
  • Контроль над данными (всё остаётся внутри компании)
  • Масштабируемость (легко добавлять новые документы)

Архитектура RAG-пайплайна

Компоненты системы

graph TD
    A[Документы] --> B[Preprocessing]
    B --> C[Chunking]
    C --> D[Embedding Model]
    D --> E[Vector DB]
    F[User Query] --> G[Query Embedding]
    G --> H[Similarity Search]
    E --> H
    H --> I[Context Retrieval]
    I --> J[LLM + Prompt]
    J --> K[Response]

Основные этапы:

  1. Ingestion Pipeline — загрузка и обработка документов
  2. Embedding Pipeline — создание векторных представлений
  3. Retrieval Pipeline — поиск релевантных фрагментов
  4. Generation Pipeline — формирование ответа

Preprocessing и chunking документов

Извлечение текста

В корпоративной среде сталкиваешься с разными форматами. Вот проверенный стек:

import PyPDF2
from docx import Document
import pandas as pd
from bs4 import BeautifulSoup

def extract_text(file_path):
    """Универсальный экстрактор текста"""
    if file_path.endswith('.pdf'):
        with open(file_path, 'rb') as file:
            reader = PyPDF2.PdfReader(file)
            text = ""
            for page in reader.pages:
                text += page.extract_text()
        return text
    
    elif file_path.endswith('.docx'):
        doc = Document(file_path)
        return '\n'.join([paragraph.text for paragraph in doc.paragraphs])
    
    elif file_path.endswith('.xlsx'):
        df = pd.read_excel(file_path)
        return df.to_string()

Стратегии chunking

Фиксированный размер (самый простой):

def fixed_chunk(text, chunk_size=1000, overlap=200):
    chunks = []
    for i in range(0, len(text), chunk_size - overlap):
        chunk = text[i:i + chunk_size]
        chunks.append(chunk)
    return chunks

Семантический chunking (лучше для качества):

from sentence_transformers import SentenceTransformer
import numpy as np

def semantic_chunk(text, model, similarity_threshold=0.7):
    sentences = text.split('.')
    embeddings = model.encode(sentences)
    
    chunks = []
    current_chunk = [sentences[0]]
    
    for i in range(1, len(sentences)):
        similarity = np.dot(embeddings[i-1], embeddings[i])
        
        if similarity > similarity_threshold:
            current_chunk.append(sentences[i])
        else:
            chunks.append('.'.join(current_chunk))
            current_chunk = [sentences[i]]
    
    return chunks

Практические рекомендации по размеру чанков:

Тип документаРазмер чанкаOverlapКомментарий
Техническая документация800-1200 токенов100-200Сохраняет контекст процедур
Юридические документы500-800 токенов150-250Важна точность формулировок
FAQ и инструкции300-600 токенов50-100Короткие, самодостаточные блоки

Выбор модели эмбеддингов

Сравнение популярных моделей

Тестировал разные модели на корпоративных данных одного клиента (техдокументация + регламенты):

МодельРазмерностьТочность@10СкоростьКомментарий
text-embedding-ada-002153678%БыстроУниверсальная, хорошо для начала
sentence-transformers/all-MiniLM-L6-v238472%Очень быстроКомпактная, для больших объёмов
intfloat/multilingual-e5-large102482%СреднеЛучшая для русского языка
cohere-embed-multilingual-v3.0102480%БыстроХорошее соотношение качество/цена

Настройка эмбеддингов

from sentence_transformers import SentenceTransformer
import faiss
import numpy as np

class EmbeddingManager:
    def __init__(self, model_name="intfloat/multilingual-e5-large"):
        self.model = SentenceTransformer(model_name)
        self.dimension = self.model.get_sentence_embedding_dimension()
        
    def create_embeddings(self, chunks, batch_size=32):
        """Создание эмбеддингов с батчингом"""
        embeddings = []
        
        for i in range(0, len(chunks), batch_size):
            batch = chunks[i:i + batch_size]
            batch_embeddings = self.model.encode(batch, show_progress_bar=True)
            embeddings.extend(batch_embeddings)
            
        return np.array(embeddings)

Векторные базы данных

Выбор векторной БД

Для прототипирования:

  • FAISS — быстро, просто, но без персистентности
  • Chroma — легкий старт, встроенная персистентность

Для продакшена:

  • Pinecone — managed решение, отличная производительность
  • Weaviate — self-hosted, богатый функционал
  • Qdrant — быстрый, хорошо документированный

Пример с Qdrant

from qdrant_client import QdrantClient
from qdrant_client.http import models

class VectorStore:
    def __init__(self, collection_name="corporate_docs"):
        self.client = QdrantClient("localhost", port=6333)
        self.collection_name = collection_name
        
    def create_collection(self, vector_size=1024):
        """Создание коллекции"""
        self.client.create_collection(
            collection_name=self.collection_name,
            vectors_config=models.VectorParams(
                size=vector_size,
                distance=models.Distance.COSINE
            )
        )
    
    def upload_documents(self, embeddings, metadata):
        """Загрузка документов с метаданными"""
        points = [
            models.PointStruct(
                id=idx,
                vector=embedding.tolist(),
                payload=meta
            )
            for idx, (embedding, meta) in enumerate(zip(embeddings, metadata))
        ]
        
        self.client.upsert(
            collection_name=self.collection_name,
            points=points
        )
    
    def search(self, query_vector, top_k=5):
        """Поиск похожих документов"""
        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_vector,
            limit=top_k
        )
        return results

Оптимизация поиска

Гибридный поиск

Комбинируем векторный поиск с классическим BM25:

from rank_bm25 import BM25Okapi
import numpy as np

class HybridRetriever:
    def __init__(self, vector_store, documents):
        self.vector_store = vector_store
        self.documents = documents
        
        # Подготовка BM25
        tokenized_docs = [doc.lower().split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized_docs)
        
    def search(self, query, top_k=10, alpha=0.7):
        """Гибридный поиск с весами"""
        
        # Векторный поиск
        query_embedding = self.embedding_model.encode([query])
        vector_results = self.vector_store.search(query_embedding[0], top_k=20)
        
        # BM25 поиск
        tokenized_query = query.lower().split()
        bm25_scores = self.bm25.get_scores(tokenized_query)
        
        # Нормализация и комбинирование
        vector_scores = {r.id: r.score for r in vector_results}
        
        combined_scores = {}
        for doc_id in range(len(self.documents)):
            vector_score = vector_scores.get(doc_id, 0)
            bm25_score = bm25_scores[doc_id] if doc_id < len(bm25_scores) else 0
            
            # Нормализация
            bm25_score = bm25_score / (max(bm25_scores) + 1e-8)
            
            combined_scores[doc_id] = alpha * vector_score + (1 - alpha) * bm25_score
        
        # Сортировка и возврат топ-k
        sorted_results = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)
        return sorted_results[:top_k]

Фильтрация по метаданным

def filtered_search(self, query_vector, filters, top_k=5):
    """Поиск с фильтрами по метаданным"""
    
    # Конструируем фильтр Qdrant
    filter_conditions = []
    
    if filters.get('document_type'):
        filter_conditions.append(
            models.FieldCondition(
                key="document_type",
                match=models.MatchValue(value=filters['document_type'])
            )
        )
    
    if filters.get('department'):
        filter_conditions.append(
            models.FieldCondition(
                key="department",
                match=models.MatchValue(value=filters['department'])
            )
        )
    
    query_filter = models.Filter(must=filter_conditions) if filter_conditions else None
    
    results = self.client.search(
        collection_name=self.collection_name,
        query_vector=query_vector,
        query_filter=query_filter,
        limit=top_k
    )
    
    return results

Генерация ответов

Промпт-инжиниринг для RAG

def create_rag_prompt(query, context_chunks, max_context_length=3000):
    """Создание промпта с контекстом"""
    
    # Ограничиваем длину контекста
    context = ""
    current_length = 0
    
    for chunk in context_chunks:
        chunk_text = f"Документ: {chunk['metadata']['title']}\n{chunk['text']}\n\n"
        if current_length + len(chunk_text) > max_context_length:
            break
        context += chunk_text
        current_length += len(chunk_text)
    
    prompt = f"""
Ты — ассистент по корпоративной документации. Отвечай точно на основе предоставленных документов.

КОНТЕКСТ:
{context}

ВОПРОС: {query}

ИНСТРУКЦИИ:
1. Используй только информацию из предоставленного контекста
2. Если информации недостаточно, так и скажи
3. Указывай источники (названия документов)
4. Отвечай структурированно и понятно

ОТВЕТ:
"""
    
    return prompt

Пост-обработка ответов

def post_process_response(self, response, source_chunks):
    """Добавление ссылок на источники"""
    
    # Извлекаем уникальные источники
    sources = set()
    for chunk in source_chunks:
        if chunk['score'] > 0.7:  # Только релевантные источники
            sources.add(chunk['metadata']['title'])
    
    # Добавляем источники к ответу
    if sources:
        sources_text = "\n\nИсточники:\n" + "\n".join([f"• {source}" for source in sources])
        response += sources_text
    
    return response

Мониторинг и улучшение качества

Метрики качества

class RAGEvaluator:
    def __init__(self):
        self.metrics = {
            'retrieval_accuracy': [],
            'answer_relevance': [],
            'source_precision': []
        }
    
    def evaluate_retrieval(self, query, retrieved_docs, ground_truth_docs):
        """Оценка качества поиска"""
        retrieved_ids = set([doc['id'] for doc in retrieved_docs])
        ground_truth_ids = set(ground_truth_docs)
        
        precision = len(retrieved_ids & ground_truth_ids) / len(retrieved_ids)
        recall = len(retrieved_ids & ground_truth_ids) / len(ground_truth_ids)
        
        return {
            'precision': precision,
            'recall': recall,
            'f1': 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        }
    
    def log_query(self, query, response, user_feedback=None):
        """Логирование запросов для анализа"""
        log_entry = {
            'timestamp': datetime.now(),
            'query': query,
            'response': response,
            'feedback': user_feedback
        }
        
        # Сохранение в БД или файл для последующего анализа
        self.save_log(log_entry)

Практические советы по деплою

Кэширование

import redis
import hashlib
import json

class ResponseCache:
    def __init__(self, redis_host='localhost', redis_port=6379, ttl=3600):
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
        self.ttl = ttl
    
    def get_cache_key(self, query):
        """Создание ключа кэша"""
        return f"rag_response:{hashlib.md5(query.encode()).hexdigest()}"
    
    def get_cached_response(self, query):
        """Получение кэшированного ответа"""
        key = self.get_cache_key(query)
        cached = self.redis_client.get(key)
        return json.loads(cached) if cached else None
    
    def cache_response(self, query, response):
        """Кэширование ответа"""
        key = self.get_cache_key(query)
        self.redis_client.setex(key, self.ttl, json.dumps(response))

Масштабирование

Для высоких нагрузок:

  • Используйте асинхронную обработку (FastAPI + async)
  • Балансируйте нагрузку между несколькими инстансами
  • Кэшируйте эмбеддинги и популярные запросы
  • Рассмотрите GPU для больших моделей эмбеддингов
from fastapi import FastAPI
import asyncio

app = FastAPI()

@app.post("/ask")
async def ask_question(query: str):
    # Проверяем кэш
    cached = cache.get_cached_response(query)
    if cached:
        return cached
    
    # Асинхронный поиск
    search_task = asyncio.create_task(vector_store.search_async(query))
    
    # Параллельная обработка
    results = await search_task
    
    # Генерация ответа
    response = await generate_response_async(query, results)
    
    # Кэширование
    cache.cache_response(query, response)
    
    return response

Типовые сценарии внедрения и ожидаемый эффект

Ниже — не выдуманные “кейсы клиентов”, а практические сценарии, с которыми обычно сталкиваются команды при внедрении RAG. Конкретные цифры зависят от качества документов, модели эмбеддингов, схемы chunking и дисциплины обновления индекса.

Сценарий 1: внутренние регламенты и инструкции

Задача: быстро находить ответы по политикам, процедурам, SLA и внутренним правилам.

Что важно настроить:

  • chunking по смысловым секциям, а не только по фиксированному размеру;
  • метаданные: подразделение, версия документа, дата обновления, тип регламента;
  • обязательные ссылки на источники в каждом ответе;
  • fallback: “не найдено достаточно данных”, если retrieval слабый.

Ожидаемый эффект: меньше ручного поиска и меньше ответов “на память”. Важно измерять это через реальные метрики: время поиска, процент ответов с источниками, оценку пользователей и количество эскалаций к экспертам.

Сценарий 2: техническая документация и база знаний разработки

Задача: помочь разработчикам быстрее находить архитектурные решения, API-контракты, runbook’и и правила code review.

Что важно настроить:

  • отдельные парсеры для Markdown, Confluence/Jira, OpenAPI и кода;
  • фильтрацию по проектам, командам и версиям;
  • инкрементальное обновление индекса при изменении документации;
  • регулярную проверку качества retrieval на наборе контрольных вопросов.

Ожидаемый эффект: быстрее onboarding, меньше повторных вопросов в чатах и выше переиспользование уже принятых архитектурных решений.

FAQ

Q: Какой размер чанка оптимальный для корпоративных документов? A: Зависит от типа документов. Для большинства случаев 800-1200 токенов с overlap 100-200 работают хорошо. Для коротких FAQ можно 300-500 токенов, для технической документации — до 1500.

Q: Стоит ли файн-тюнить модель эмбеддингов под свои данные? A: В 80% случаев не нужно. Современные multilingual модели хорошо работают out-of-the-box. Файн-тюнинг имеет смысл, если у вас очень специфическая терминология и большой размеченный датасет.

Q: Как обновлять индекс при изменении документов? A: Инкрементально. Ведите версионирование документов, при изменении удаляйте старые чанки и добавляйте новые. Полная переиндексация нужна редко.

Q: Какую векторную БД выбрать для продакшена? A: Если бюджет позволяет — Pinecone (managed). Если нужен self-hosted — Qdrant или Weaviate. Для простых случаев Chroma вполне подойдет.

Q: Как измерить качество RAG-системы? A: Основные метрики: точность поиска (precision@k), релевантность ответов (можно через LLM-as-judge), пользовательская оценка (лайки/дизлайки). Обязательно логируйте все запросы для анализа.

Нужна помощь с построением RAG-пайплайна для ваших корпоративных документов? Напишите мне — обсудим ваш проект.

Обсудить проект

Есть идея или задача? Давайте обсудим, как можно её реализовать с помощью современных AI-технологий.

Написать мне
Вернуться к блогу