Files
dnd-helpers/src/rag/manager.py
T
charles b25f82cefc Implement RAG summarization and context pipeline
- Add ContextPipeline for async RAG lookups
- Implement RAG result summarization via LLMProcessor
- Add CLI flag for PDF ingestion
- Strip markdown code blocks from LLM responses
- Update TUI context display to use ListItems
2026-05-27 00:17:47 -07:00

174 lines
6.2 KiB
Python

import os
from typing import Any, List, Optional
import chromadb
import pdfplumber
from llama_index.core import Document, Settings, StorageContext, VectorStoreIndex
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.vector_stores.chroma import ChromaVectorStore
from src.llm.models import ContextUpdate
from src.llm.processor import LLMProcessor
class RAGManager:
def __init__(self, persist_dir: str = "data/rag_index"):
self.persist_dir = persist_dir
self.db = chromadb.PersistentClient(path=self.persist_dir)
self.collection_name = "phb_collection"
# Initialize Chroma Vector Store
self.vector_store = ChromaVectorStore(
chroma_collection=self.db.get_or_create_collection(self.collection_name)
)
# Initialize Storage Context
self.storage_context = StorageContext.from_defaults(
vector_store=self.vector_store
)
# Use a local HuggingFace embedding model to avoid API key issues during verification
Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
# Load index if it exists, otherwise initialize
try:
self.index = VectorStoreIndex.from_vector_store(
self.vector_store, storage_context=self.storage_context
)
except Exception:
self.index = None
def ingest_pdf(self, pdf_path: str):
"""
Parses a PDF, chunks it, and stores embeddings in ChromaDB.
"""
documents = []
with pdfplumber.open(pdf_path) as pdf:
for i, page in enumerate(pdf.pages):
text = page.extract_text()
if text:
# Create a document for each page
# In a real scenario, we might use a recursive character splitter
# but for PHB, page-level chunking is a good start.
doc = Document(
text=text, metadata={"source": f"PHB p. {i + 1}", "page": i + 1}
)
documents.append(doc)
if not documents:
print(f"No text extracted from {pdf_path}")
return
# Create index from documents
self.index = VectorStoreIndex.from_documents(
documents, storage_context=self.storage_context
)
print(f"Successfully ingested {pdf_path} into the vector store.")
def ingest_file(self, file_path: str):
"""
Loads a single markdown file into the index.
"""
with open(file_path, "r", encoding="utf-8") as f:
text = f.read()
# Use the filename as the source
source = os.path.basename(file_path)
doc = Document(text=text, metadata={"source": source})
# If index doesn't exist, initialize it
if not self.index:
self.index = VectorStoreIndex.from_documents(
[doc], storage_context=self.storage_context
)
else:
# Insert into existing index
self.index.insert(doc)
print(f"Successfully ingested {file_path} into the vector store.")
def summarize_results(self, query: str, nodes: List[Any]) -> List[ContextUpdate]:
"""
Uses an LLM to transform raw snippets into concise "insights", filtering out irrelevant content.
"""
if not nodes:
return []
processor = LLMProcessor()
# Construct the context from retrieved nodes
context_text = "\n\n".join(
[
f"Source: {node.metadata.get('source', 'Unknown')}\nContent: {node.text}"
for node in nodes
]
)
system_prompt = (
"You are a precise research assistant. Your task is to analyze provided text snippets "
"and extract only the information that is directly relevant to the user's query. "
"1. If a snippet is irrelevant to the query, discard it completely. "
"2. For relevant information, synthesize it into a concise, single-sentence 'insight'. "
"3. Do not simply repeat the raw text; summarize it for clarity and brevity. "
"4. If no snippets are relevant to the query, return an empty list. "
"5. Be factual and do not hallucinate. Use only the provided snippets."
)
user_prompt = (
f"Query: {query}\n\n"
f"Snippets:\n{context_text}\n\n"
"Return a JSON object with a key 'insights' containing a list of objects, each with 'snippet' and 'source'."
)
result = processor._call_llm(
system_prompt,
user_prompt,
response_format={"type": "json_object"},
)
import json
try:
data = json.loads(result)
# Expecting a format like {"insights": [{"snippet": "...", "source": "..."}, ...]}
insights = data.get("insights", []) if isinstance(data, dict) else data
if not insights:
print(f"Summarization: No relevant insights found for query: {query}")
return [
ContextUpdate(
query=query, snippet=item["snippet"], source=item["source"]
)
for item in insights
]
except (json.JSONDecodeError, KeyError, TypeError) as e:
print(f"Summarization parsing error: {e}")
return []
def retrieve(
self, query: str, top_k: int = 5, summarize: bool = False
) -> List[ContextUpdate]:
"""
Retrieves the top-K most relevant snippets for a given query.
"""
if not self.index:
print("Index not initialized. Please ingest documents first.")
return []
# Create a retriever
retriever = self.index.as_retriever(similarity_top_k=top_k)
nodes = retriever.retrieve(query)
if summarize:
return self.summarize_results(query, nodes)
results = []
for node in nodes:
# Extract metadata
source = node.metadata.get("source", "Unknown Source")
results.append(ContextUpdate(query=query, snippet=node.text, source=source))
return results