import logging import os from posix import system from this import s from typing import Any, Dict, Optional from openai import OpenAI from pydantic import ValidationError from .models import ExtractionResult, FilterResult from .prompts import ( EXTRACTION_SYSTEM_PROMPT, NOISE_FILTER_SYSTEM_PROMPT, QUERY_ANSWER_SYSTEM_PROMPT, ) logger = logging.getLogger(__name__) class LLMProcessor: def __init__( self, api_key: Optional[str] = None, base_url: Optional[str] = None, model: Optional[str] = None, ): """ Initializes the LLMProcessor. :param api_key: OpenAI API key. If None, it looks for OPENAI_API_KEY in environment variables. :param base_url: OpenAI-compatible base URL (e.g., for vLLM). :param model: The model to use for processing. If None, it looks for LLM_MODEL in environment variables. """ backend = os.environ.get("LLM_BACKEND", "openai").lower() if backend == "ollama": # Ollama's OpenAI-compatible API final_base_url = base_url or "http://localhost:11434/v1" final_api_key = api_key or "ollama" elif backend == "vllm": # Remote vLLM server final_base_url = base_url or os.environ.get("OPENAI_BASE_URL") final_api_key = api_key or os.environ.get("OPENAI_API_KEY") else: # default to openai final_base_url = base_url or os.environ.get("OPENAI_BASE_URL") final_api_key = api_key or os.environ.get("OPENAI_API_KEY") try: self.client = OpenAI( api_key=final_api_key, base_url=final_base_url, ) # Simple connectivity check for local backends if backend == "ollama": # We can't easily check connectivity without making a call, # but we can ensure the client is initialized. pass except Exception as e: logger.error(f"Error initializing LLM client for backend {backend}: {e}") raise self.model = model or os.environ.get("LLM_MODEL", "gpt-4o") def _call_llm( self, system_prompt: str, user_prompt: str, context: Optional[str] = None, response_format: Optional[Any] = None, ) -> str: """ Generic method to call the LLM. """ messages = [ {"role": "system", "content": system_prompt}, ] if context: messages.append( { "role": "system", "content": f"Context from previous conversation:\n{context}", } ) messages.append({"role": "user", "content": user_prompt}) try: response = self.client.chat.completions.create( model=self.model, messages=messages, response_format=response_format, extra_body={"enable_thinking": False}, ) content = response.choices[0].message.content # Strip markdown code blocks if present if content.startswith("```"): import re content = re.sub( r"^```(?:json)?\n?|```$", "", content, flags=re.MULTILINE ).strip() return content except Exception as e: logger.error(f"LLM Error: {e}") return "" def generate_answer(self, query: str, context: str) -> str: """ Generates a natural language answer to a DM query. """ return self._call_llm( QUERY_ANSWER_SYSTEM_PROMPT, query, context=context, ) def filter_transcript( self, text: str, context: Optional[str] = None ) -> FilterResult: """ Stage 1: Raw Transcript -> Filtered Text. """ result = self._call_llm( NOISE_FILTER_SYSTEM_PROMPT, text, context=context, response_format={"type": "json_object"}, ) logger.info(f"LLM Processor (Filter): {text} -> {result}") import json try: data = json.loads(result) return FilterResult(**data) except (json.JSONDecodeError, ValidationError) as e: logger.error(f"Filter Parsing Error: {e}") return FilterResult(contextual_info="", filtered_text=result) def extract_structured_data( self, filtered_text: str, context: Optional[str] = None ) -> ExtractionResult: """ Stage 2: Filtered Text -> Structured Data. """ logger.info(f"LLM Processor (Extract): Calling extraction for: {filtered_text}") try: # Using standard chat.completions.create with JSON mode for better compatibility with vLLM logger.info("LLM Processor (Extract): Sending request to backend...") system_prompt = EXTRACTION_SYSTEM_PROMPT if context: system_prompt += f"\n{context}" messages = [ {"role": "system", "content": system_prompt}, ] messages.append({"role": "user", "content": filtered_text}) for message in messages: logger.info(f"LLM Processor (Extract): Message: {message}") response = self.client.chat.completions.create( model=self.model, messages=messages, response_format={"type": "json_object"}, extra_body={"enable_thinking": False}, ) logger.info("LLM Processor (Extract): Response received from backend.") import json content = response.choices[0].message.content logger.info(f"LLM Processor (Extract): Raw JSON response: {content}") data = json.loads(content) # Map the JSON data to the Pydantic model return ExtractionResult(**data) except Exception as e: logger.error(f"Extraction Error: {e}") # Return an empty ExtractionResult if parsing fails return ExtractionResult()