Improve audio capture and LLM integration

- Implement Silero VAD for dynamic audio chunking
- Add support for Ollama and vLLM backends
- Harden extraction prompts for strict JSON output
- Refactor TUI worker to handle proposals asynchronously
This commit is contained in:
2026-05-26 19:51:48 -07:00
parent 60e170e777
commit 58bab75bb5
11 changed files with 290 additions and 78 deletions
+2
View File
@@ -2,5 +2,7 @@
OPENAI_API_KEY=no-key-required
OPENAI_BASE_URL=https://vllm.tipsy.codes/v1
LLM_MODEL=Intel/gemma-4-31B-it-int4-AutoRound
#LLM_BACKEND=ollama
#LLM_MODEL=gemma:2b
WHISPER_MODEL=base
AUDIO_DEVICE_ID=None
Binary file not shown.
Binary file not shown.
+28 -2
View File
@@ -22,10 +22,34 @@ class LLMProcessor:
:param base_url: OpenAI-compatible base URL (e.g., for vLLM).
:param model: The model to use for processing. If None, it looks for LLM_MODEL in environment variables.
"""
backend = os.environ.get("LLM_BACKEND", "openai").lower()
if backend == "ollama":
# Ollama's OpenAI-compatible API
final_base_url = base_url or "http://localhost:11434/v1"
final_api_key = api_key or "ollama"
elif backend == "vllm":
# Remote vLLM server
final_base_url = base_url or os.environ.get("OPENAI_BASE_URL")
final_api_key = api_key or os.environ.get("OPENAI_API_KEY")
else: # default to openai
final_base_url = base_url or os.environ.get("OPENAI_BASE_URL")
final_api_key = api_key or os.environ.get("OPENAI_API_KEY")
try:
self.client = OpenAI(
api_key=api_key or os.environ.get("OPENAI_API_KEY"),
base_url=base_url or os.environ.get("OPENAI_BASE_URL"),
api_key=final_api_key,
base_url=final_base_url,
)
# Simple connectivity check for local backends
if backend == "ollama":
# We can't easily check connectivity without making a call,
# but we can ensure the client is initialized.
pass
except Exception as e:
print(f"Error initializing LLM client for backend {backend}: {e}")
raise
self.model = model or os.environ.get("LLM_MODEL", "gpt-4o")
def _call_llm(
@@ -67,6 +91,7 @@ class LLMProcessor:
print(f"LLM Processor (Extract): Calling extraction for: {filtered_text}")
try:
# Using standard chat.completions.create with JSON mode for better compatibility with vLLM
print("LLM Processor (Extract): Sending request to backend...")
response = self.client.chat.completions.create(
model=self.model,
messages=[
@@ -76,6 +101,7 @@ class LLMProcessor:
response_format={"type": "json_object"},
extra_body={"include_reasoning": False},
)
print("LLM Processor (Extract): Response received from backend.")
import json
+46 -5
View File
@@ -11,10 +11,51 @@ EXTRACTION_SYSTEM_PROMPT = """
You are a D&D session analyzer. Your goal is to extract structured data from a filtered transcript.
Extract any changes to character states (HP, status effects, inventory) and any new lore facts (NPCs, locations, world-building).
Guidelines:
1. Lore: Identify any new information about the world, people, and places.
2. Character State: Look for mentions of damage, healing, or items being gained or lost.
3. Events: Note significant plot developments.
DO NOT THINK.
Be precise. If no relevant information is found, return empty lists.
CONSTRAINTS:
- OUTPUT ONLY VALID JSON.
- DO NOT include any commentary, explanations, or "thought" blocks.
- DO NOT include any keys other than "lore", "character_state", and "events".
- If no relevant information is found, return empty lists for all keys.
- If a character name is not specified (e.g., "Your character"), use "Player Character".
Strict Output Format:
Return a JSON object with exactly these keys:
1. "lore": A list of objects. Each object MUST have:
- "category": (string) 'NPC', 'Location', 'WorldBuilding', or 'Plot'
- "entity_name": (string) The name of the NPC, Location, or entity
- "content": (string) The actual lore fact or description
2. "character_state": A list of objects. Each object MUST have:
- "character_name": (string) Name of the character
- "hp_change": (integer, optional) Change in HP
- "status_effects_added": (list of strings)
- "status_effects_removed": (list of strings)
- "inventory_changes": (list of objects with "item", "quantity", "action")
3. "events": A list of strings. Each string should be a concise description of a significant plot development.
Example Output:
{
"lore": [
{
"category": "NPC",
"entity_name": "Thorne",
"content": "A gruff dwarf who runs the local tavern."
}
],
"character_state": [
{
"character_name": "Grog",
"hp_change": -10,
"status_effects_added": [],
"status_effects_removed": [],
"inventory_changes": []
}
],
"events": [
"The party discovered the secret entrance to the crypt."
]
}
Be precise. Return only the JSON object.
"""
+4 -30
View File
@@ -88,40 +88,14 @@ class PipelineOrchestrator:
Worker that handles TUI: Proposal -> Persistence.
"""
logger.info("TUI Worker started.")
while self.is_running:
try:
# Get proposal from queue
result = await self.proposal_queue.get()
logger.info("Proposal received. Launching TUI for confirmation.")
# Launch TUI (Note: Textual's run() is blocking)
# We need to run the TUI in a way that doesn't block the overall event loop
# or we accept that the system pauses for confirmation.
# Given the requirement for "Non-blocking", but TUI is a focus-modal,
# we launch it.
# To integrate Textual with asyncio, we can use its async support.
# However, ConfirmationApp is designed as a standard Textual app.
# Since we want to bridge the asyncio loop, we'll run the TUI.
# Note: In a real high-performance pipeline, we'd use an async TUI
# that updates widgets in real-time. For now, we follow the
# a confirmation screen pattern.
# we will use the run() method, but since we are in an async loop,
# we might need to wrap it or use an async variant.
# For this integration, we'll use the run() method as defined
# in ConfirmationApp, which will take over the terminal.
ConfirmationApp(result).run()
# Launch TUI exactly once.
# Pass the proposal queue to the app.
app = ConfirmationApp(proposal_queue=self.proposal_queue)
await app.run_async()
except Exception as e:
logger.error(f"TUI Worker error: {e}")
# Small sleep
await asyncio.sleep(0.1)
async def run(self):
"""
Starts the pipeline workers and the audio listener.
Binary file not shown.
+115 -22
View File
@@ -3,6 +3,7 @@ import logging
import numpy as np
import sounddevice as sd
import torch
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@@ -11,16 +12,56 @@ logger = logging.getLogger(__name__)
class AudioListener:
"""
Captures audio from the microphone in chunks and puts them into an asyncio queue.
Uses Silero VAD for dynamic chunking based on speech detection.
"""
def __init__(self, sample_rate=16000, chunk_duration=3, device=None, loop=None):
def __init__(
self,
sample_rate=16000,
device=None,
loop=None,
vad_threshold=0.5,
silence_duration=0.5,
max_chunk_size=30,
):
self.sample_rate = sample_rate
self.chunk_duration = chunk_duration
self.device = device
self.loop = loop
# VAD Configuration
self.vad_threshold = vad_threshold
self.silence_duration = silence_duration
self.max_chunk_size = max_chunk_size
self.audio_queue = asyncio.Queue()
self.is_listening = False
# Load Silero VAD model
try:
self.model, utils = torch.hub.load(
repo_or_dir="snakers4/silero-vad", model="silero_vad"
)
self.model.eval()
if torch.cuda.is_available():
self.model = self.model.cuda()
logger.info("Silero VAD model loaded successfully.")
except Exception as e:
logger.error(f"Failed to load Silero VAD model: {e}")
raise
# VAD state
self.is_collecting = False
self.silence_samples = 0
self.max_silence_samples = int(self.silence_duration * self.sample_rate)
self.max_chunk_samples = int(self.max_chunk_size * self.sample_rate)
# Pre-padding buffer (e.g., 200ms)
self.pre_padding_samples = int(0.2 * self.sample_rate)
self._ring_buffer = np.zeros(self.pre_padding_samples, dtype=np.float32)
self._ring_buffer_idx = 0
self._collection_buffer = []
def _audio_callback(self, indata, frames, time, status):
"""
This callback is called by sounddevice for every block of audio captured.
@@ -28,25 +69,77 @@ class AudioListener:
if status:
logger.warning(f"SoundDevice status: {status}")
# We capture audio in chunks. sounddevice provides blocks.
# We append these blocks to a buffer until we reach chunk_duration.
self._buffer.append(indata.copy())
# Ensure data is float32 and 1D
audio_data = indata.flatten().astype(np.float32)
# Check if we have enough data for a full chunk
current_duration = len(self._buffer) * frames / self.sample_rate
if current_duration >= self.chunk_duration:
# Concatenate all buffers into one chunk
chunk = np.concatenate(self._buffer, axis=0)
# Trim to exactly chunk_duration to maintain consistency
target_samples = int(self.sample_rate * self.chunk_duration)
chunk = chunk[:target_samples]
# 1. Update ring buffer for pre-padding
# We overwrite the oldest data in the ring buffer
num_samples = len(audio_data)
for i in range(num_samples):
self._ring_buffer[self._ring_buffer_idx] = audio_data[i]
self._ring_buffer_idx = (
self._ring_buffer_idx + 1
) % self.pre_padding_samples
# Flatten to 1D array (samples,) as expected by faster-whisper
chunk = chunk.flatten()
# 2. Run VAD
# Convert to torch tensor
tensor_input = torch.from_numpy(audio_data)
if torch.cuda.is_available():
tensor_input = tensor_input.cuda()
# Use call_soon_threadsafe to put the chunk into the asyncio queue from the callback thread
with torch.no_grad():
# The model expects (batch, samples)
# Silero VAD expects frames of 512, 1024, or 1536 for 16kHz
# Since we use block_size=512, we are good.
probability = self.model(tensor_input.unsqueeze(0), self.sample_rate).item()
# 3. State-based Chunking Logic
if probability > self.vad_threshold:
if not self.is_collecting:
# Start Detection: Transition to COLLECTING
logger.debug("Speech detected. Starting collection.")
self.is_collecting = True
# Pre-padding: Append the ring buffer in the correct order
padding = np.roll(self._ring_buffer, -self._ring_buffer_idx)
self._collection_buffer.append(padding)
# Reset silence counter
self.silence_samples = 0
self._collection_buffer.append(audio_data)
elif self.is_collecting:
# We are in COLLECTING state but current frame is silence
self._collection_buffer.append(audio_data)
self.silence_samples += num_samples
# End Detection: Silence lasted longer than threshold
if self.silence_samples >= self.max_silence_samples:
logger.debug("Silence detected. Flushing chunk.")
self._flush_buffer()
# Max Chunk Size: Force flush
elif sum(len(b) for b in self._collection_buffer) >= self.max_chunk_samples:
logger.debug("Max chunk size reached. Force flushing.")
self._flush_buffer()
else:
# IDLE state, just waiting for speech
pass
def _flush_buffer(self):
"""
Concatenates the collection buffer and puts it into the asyncio queue.
"""
if not self._collection_buffer:
return
chunk = np.concatenate(self._collection_buffer).flatten()
self.loop.call_soon_threadsafe(self.audio_queue.put_nowait, chunk)
self._buffer = []
# Reset state
self._collection_buffer = []
self.is_collecting = False
self.silence_samples = 0
def start(self):
"""
@@ -56,11 +149,11 @@ class AudioListener:
raise RuntimeError("Event loop must be provided to AudioListener")
self.is_listening = True
self._buffer = []
self._collection_buffer = []
# Define the block size for the callback
# We'll use a smaller block size (e.g. 0.1s) to keep the callback responsive
block_size = int(self.sample_rate * 0.1)
# Define the block size for the callback.
# Silero VAD v4 recommends 512 samples for 16kHz.
block_size = 512
try:
self.stream = sd.InputStream(
@@ -71,7 +164,7 @@ class AudioListener:
callback=self._audio_callback,
)
self.stream.start()
logger.info("Audio listener started.")
logger.info("Audio listener started with VAD-based chunking.")
except Exception as e:
logger.error(f"Failed to start audio listener: {e}")
self.is_listening = False
Binary file not shown.
+85 -9
View File
@@ -1,4 +1,5 @@
from typing import List, Union
import asyncio
from typing import List, Optional, Union
from textual.app import App, ComposeResult
from textual.containers import Container, Horizontal, Vertical
@@ -17,13 +18,13 @@ class ConfirmationApp(App):
#left-pane {
width: 40%;
border: solid 1;
border: solid;
padding: 1;
}
#right-pane {
width: 60%;
border: solid 1;
border: solid;
padding: 1;
layout: vertical;
}
@@ -43,7 +44,7 @@ class ConfirmationApp(App):
display: none;
height: auto;
layout: vertical;
border: solid 1;
border: solid;
padding: 1;
}
@@ -56,11 +57,17 @@ class ConfirmationApp(App):
("q", "quit", "Quit"),
]
def __init__(self, result: ExtractionResult):
def __init__(
self,
result: Optional[ExtractionResult] = None,
proposal_queue: Optional[asyncio.Queue] = None,
):
super().__init__()
self.result = result
self.proposal_queue = proposal_queue
self.pending_updates: List[Union[LoreUpdate, CharacterStateUpdate]] = []
if result:
# Populate pending updates from result
self.pending_updates.extend(result.lore_updates)
self.pending_updates.extend(result.character_updates)
@@ -100,6 +107,7 @@ class ConfirmationApp(App):
def on_mount(self) -> None:
table = self.query_one("#update-table", DataTable)
table.cursor_type = "row"
table.add_columns("Type", "Target", "Update")
for i, update in enumerate(self.pending_updates):
@@ -117,8 +125,72 @@ class ConfirmationApp(App):
)
table.add_row("Char", update.character_name, change_text, key=str(i))
def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
self.selected_index = event.row
if self.pending_updates:
self.handle_row_highlight(0)
self.query_one("#btn-accept", Button).focus()
if self.proposal_queue:
self.run_worker(self.poll_proposal_queue, thread=False)
async def poll_proposal_queue(self) -> None:
"""
Background worker that polls the proposal queue for new extraction results.
"""
while True:
try:
result = await self.proposal_queue.get()
self.add_result(result)
except Exception as e:
# Log error but keep the worker running
self.log(f"Error polling proposal queue: {e}")
finally:
# Signal that the item has been processed
if hasattr(self.proposal_queue, "task_done"):
self.proposal_queue.task_done()
def add_result(self, result: ExtractionResult) -> None:
"""
Adds results from the LLM processor to the TUI table.
"""
table = self.query_one("#update-table", DataTable)
start_index = len(self.pending_updates)
for update in result.lore_updates + result.character_updates:
self.pending_updates.append(update)
actual_index = len(self.pending_updates) - 1
if isinstance(update, LoreUpdate):
table.add_row(
"Lore",
update.entity_name or "General",
update.content,
key=str(actual_index),
)
elif isinstance(update, CharacterStateUpdate):
change_text = f"HP: {update.hp_change or 0}"
if update.status_effects_added:
change_text += f", Added: {', '.join(update.status_effects_added)}"
if update.status_effects_removed:
change_text += (
f", Removed: {', '.join(update.status_effects_removed)}"
)
table.add_row(
"Char", update.character_name, change_text, key=str(actual_index)
)
# If the table was previously empty and we added updates, focus the first one.
if start_index == 0 and self.pending_updates:
self.handle_row_highlight(0)
self.query_one("#btn-accept", Button).focus()
def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None:
self.handle_row_highlight(event.cursor_row)
def handle_row_highlight(self, row: int) -> None:
self.selected_index = row
if self.selected_index < 0 or self.selected_index >= len(self.pending_updates):
return
update = self.pending_updates[self.selected_index]
details_text = self.query_one("#details-text", Static)
@@ -135,7 +207,7 @@ class ConfirmationApp(App):
self.query_one("#edit-container", Vertical).styles.display = "none"
self.query_one("#details-container", Vertical).styles.display = "block"
def on_button_clicked(self, event: Button.Clicked) -> None:
def on_button_pressed(self, event: Button.Pressed) -> None:
if self.selected_index == -1:
return
@@ -237,5 +309,9 @@ class ConfirmationApp(App):
)
table.add_row("Char", update.character_name, change_text, key=str(i))
if self.pending_updates:
self.handle_row_highlight(0)
self.query_one("#btn-accept", Button).focus()
else:
self.selected_index = -1
self.query_one("#details-text", Static).update("No update selected")
self.query_one("#details-text", Static).update("All updates processed.")