feat: implement core D&D helpers logic and system architecture

This commit is contained in:
2026-05-25 22:14:58 -07:00
parent 5bb483431f
commit 685586318f
36 changed files with 1137 additions and 0 deletions
View File
Binary file not shown.
+1
View File
@@ -0,0 +1 @@
# LLM Module
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+56
View File
@@ -0,0 +1,56 @@
from typing import List, Optional
from pydantic import BaseModel, Field
class LoreUpdate(BaseModel):
category: str = Field(
...,
description="Category of lore: 'NPC', 'Location', 'WorldBuilding', or 'Plot'",
)
entity_name: Optional[str] = Field(
None, description="The name of the NPC, Location, or entity being updated"
)
content: str = Field(..., description="The actual lore fact or update")
context: Optional[str] = Field(
None, description="Brief context from the conversation that led to this update"
)
class InventoryChange(BaseModel):
item: str = Field(..., description="Name of the item")
quantity: int = Field(1, description="Quantity of the item")
action: str = Field(..., description="Either 'added' or 'removed'")
class CharacterStateUpdate(BaseModel):
character_name: str = Field(
..., description="Name of the character whose state is changing"
)
hp_change: Optional[int] = Field(
None, description="Change in HP (negative for damage, positive for healing)"
)
status_effects_added: List[str] = Field(
default_factory=list,
description="List of status effects applied to the character",
)
status_effects_removed: List[str] = Field(
default_factory=list,
description="List of status effects removed from the character",
)
inventory_changes: List[InventoryChange] = Field(
default_factory=list,
description="List of items added or removed from inventory",
)
class ExtractionResult(BaseModel):
lore_updates: List[LoreUpdate] = Field(
default_factory=list, description="List of discovered lore facts"
)
character_updates: List[CharacterStateUpdate] = Field(
default_factory=list, description="List of character state changes"
)
significant_events: List[str] = Field(
default_factory=list, description="List of significant plot points or events"
)
+92
View File
@@ -0,0 +1,92 @@
import os
from typing import Any, Dict, Optional
from openai import OpenAI
from pydantic import ValidationError
from .models import ExtractionResult
from .prompts import EXTRACTION_SYSTEM_PROMPT, NOISE_FILTER_SYSTEM_PROMPT
class LLMProcessor:
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
model: str = "gpt-4o",
):
"""
Initializes the LLMProcessor.
:param api_key: OpenAI API key. If None, it looks for OPENAI_API_KEY in environment variables.
:param base_url: OpenAI-compatible base URL (e.g., for vLLM).
:param model: The model to use for processing.
"""
self.client = OpenAI(
api_key=api_key or os.environ.get("OPENAI_API_KEY"),
base_url=base_url or os.environ.get("OPENAI_BASE_URL"),
)
self.model = model
def _call_llm(
self,
system_prompt: str,
user_prompt: str,
response_format: Optional[Any] = None,
) -> str:
"""
Generic method to call the LLM.
"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
response_format=response_format,
)
return response.choices[0].message.content
except Exception as e:
print(f"LLM Error: {e}")
return ""
def filter_transcript(self, text: str) -> str:
"""
Stage 1: Raw Transcript -> Filtered Text.
"""
return self._call_llm(NOISE_FILTER_SYSTEM_PROMPT, text)
def extract_structured_data(self, filtered_text: str) -> ExtractionResult:
"""
Stage 2: Filtered Text -> Structured Data.
"""
# We use OpenAI's structured output (JSON mode/tool calling) via Pydantic's response_format.
# For models that support it, we can pass the Pydantic model directly.
# If we are using an older model or vLLM, we might need to manually parse the JSON.
# Using the newer 'beta.chat.completions.parse' for Pydantic support
try:
completion = self.client.beta.chat.completions.parse(
model=self.model,
messages=[
{"role": "system", "content": EXTRACTION_SYSTEM_PROMPT},
{"role": "user", "content": filtered_text},
],
response_format=ExtractionResult,
)
return completion.choices[0].message.parsed
except Exception as e:
print(f"Extraction Error: {e}")
# Return an empty ExtractionResult if parsing fails
return ExtractionResult()
def process_pipeline(self, raw_text: str) -> ExtractionResult:
"""
Executes the two-stage pipeline: Raw Transcript -> Filtered Text -> Structured Data.
"""
filtered_text = self.filter_transcript(raw_text)
if not filtered_text:
return ExtractionResult()
return self.extract_structured_data(filtered_text)
+20
View File
@@ -0,0 +1,20 @@
# System prompts for the LLM pipeline
NOISE_FILTER_SYSTEM_PROMPT = """
You are a D&D Game Master's assistant. Given a transcript, remove all out-of-character (OOC) chatter, logistical discussions (e.g., 'Where is my d20?'), and non-relevant noise.
Output only the in-character dialogue and game-relevant events.
Keep the original speakers' names if they are present in the transcript.
Do not add any commentary or summaries. Just filter the text.
"""
EXTRACTION_SYSTEM_PROMPT = """
You are a D&D session analyzer. Your goal is to extract structured data from a filtered transcript.
Extract any changes to character states (HP, status effects, inventory) and any new lore facts (NPCs, locations, world-building).
Guidelines:
1. Lore: Identify any new information about the world, people, and places.
2. Character State: Look for mentions of damage, healing, or items being gained or lost.
3. Events: Note significant plot developments.
Be precise. If no relevant information is found, return empty lists.
"""
+1
View File
@@ -0,0 +1 @@
# Persistence module for D&D Helpers.
Binary file not shown.
Binary file not shown.
+90
View File
@@ -0,0 +1,90 @@
import json
from pathlib import Path
from typing import Any, Dict, Optional
from src.llm.models import CharacterStateUpdate
DATA_CHARS_DIR = Path("data/chars")
def ensure_chars_dir():
"""Ensures the character data directory exists."""
DATA_CHARS_DIR.mkdir(parents=True, exist_ok=True)
def get_character_state(character_name: str) -> Dict[str, Any]:
"""
Reads character state from a JSON file.
If the character doesn't exist, returns a default state.
"""
ensure_chars_dir()
file_path = DATA_CHARS_DIR / f"{character_name}.json"
if not file_path.exists():
return {
"character_name": character_name,
"stats": {"hp": 0, "max_hp": 0, "ac": 0},
"status_effects": [],
"inventory": [],
}
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
def update_character_state(update: CharacterStateUpdate):
"""
Updates character state based on a CharacterStateUpdate model.
Reads the current state, applies changes, and writes it back.
"""
ensure_chars_dir()
state = get_character_state(update.character_name)
# Update HP
if update.hp_change is not None:
current_hp = state.get("stats", {}).get("hp", 0)
state["stats"]["hp"] = current_hp + update.hp_change
# Update Status Effects
status_effects = set(state.get("status_effects", []))
for effect in update.status_effects_added:
status_effects.add(effect)
for effect in update.status_effects_removed:
status_effects.discard(effect)
state["status_effects"] = list(status_effects)
# Update Inventory
inventory = state.get("inventory", [])
for change in update.inventory_changes:
# Find if item already exists
item_exists = False
for item in inventory:
if item["item"] == change.item:
if change.action == "added":
item["quantity"] = item.get("quantity", 1) + change.quantity
elif change.action == "removed":
item["quantity"] = max(0, item.get("quantity", 1) - change.quantity)
item_exists = True
break
if not item_exists:
if change.action == "added":
inventory.append(
{
"item": change.item,
"quantity": change.quantity,
"weight": 0, # Default weight if not provided
}
)
elif change.action == "removed":
# Item not in inventory, do nothing or log
pass
state["inventory"] = inventory
file_path = DATA_CHARS_DIR / f"{update.character_name}.json"
with open(file_path, "w", encoding="utf-8") as f:
json.dump(state, f, indent=4)
return str(file_path)
+55
View File
@@ -0,0 +1,55 @@
import os
from pathlib import Path
from typing import Optional
from src.llm.models import LoreUpdate
DATA_LORE_DIR = Path("data/lore")
CATEGORY_MAP = {
"NPC": "NPCs",
"Location": "Locations",
"WorldBuilding": "World",
"Plot": "Timeline",
}
def ensure_lore_dir():
"""Ensures the lore data directory and subdirectories exist."""
DATA_LORE_DIR.mkdir(parents=True, exist_ok=True)
for folder in CATEGORY_MAP.values():
if folder != "Timeline":
(DATA_LORE_DIR / folder).mkdir(parents=True, exist_ok=True)
def update_lore(update: LoreUpdate):
"""
Updates lore based on a LoreUpdate model.
- For NPC/Location/WorldBuilding: Updates the specific entity's file.
- For Plot: Appends to Timeline.md.
"""
ensure_lore_dir()
category = update.category
folder = CATEGORY_MAP.get(category, "Other")
if category == "Plot":
file_path = DATA_LORE_DIR / "Timeline.md"
content_to_append = f"- {update.content}\n"
elif update.entity_name:
category_folder = CATEGORY_MAP.get(category, "Other")
file_path = DATA_LORE_DIR / category_folder / f"{update.entity_name}.md"
# For entity-specific files, we can append the content as a list item.
# In a more complex system, we might check for existing headers.
content_to_append = f"- {update.content}\n"
else:
# Fallback if no entity name is provided for a category that expects one.
# We'll put it in a general file for that category.
category_folder = CATEGORY_MAP.get(category, "Other")
file_path = DATA_LORE_DIR / category_folder / "General.md"
content_to_append = f"- {update.content}\n"
with open(file_path, "a", encoding="utf-8") as f:
f.write(content_to_append)
return str(file_path)
+155
View File
@@ -0,0 +1,155 @@
import asyncio
import logging
from src.llm.models import ExtractionResult
from src.llm.processor import LLMProcessor
from src.stt.listener import AudioListener
from src.stt.transcriber import Transcriber
from src.ui.tui import ConfirmationApp
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PipelineOrchestrator:
def __init__(self, loop: asyncio.AbstractEventLoop):
self.loop = loop
# Modules
self.listener = AudioListener(loop=self.loop)
self.transcriber = Transcriber()
self.processor = LLMProcessor()
# Queues
self.transcript_queue = asyncio.Queue()
self.proposal_queue = asyncio.Queue()
self.is_running = False
async def stt_worker(self):
"""
Worker that handles STT: Audio -> Text.
"""
logger.info("STT Worker started.")
while self.is_running:
try:
# Get audio chunk from listener
audio_chunk = await self.listener.get_chunk()
# Transcribe
text = self.transcriber.transcribe(audio_chunk)
if text:
logger.info(f"Transcribed: {text}")
await self.transcript_queue.put(text)
except Exception as e:
logger.error(f"STT Worker error: {e}")
# Small sleep to prevent tight loop if get_chunk is fast
await asyncio.sleep(0.1)
async def llm_worker(self):
"""
Worker that handles LLM: Text -> Proposal.
"""
logger.info("LLM Worker started.")
while self.is_running:
try:
# Get raw text from transcript queue
raw_text = await self.transcript_queue.get()
logger.info(f"Processing text: {raw_text}")
# Process via LLM (Filter -> Extract)
result = self.processor.process_pipeline(raw_text)
if (
result.lore_updates
or result.character_updates
or result.significant_events
):
logger.info("Proposal generated. Putting into proposal queue.")
await self.proposal_queue.put(result)
else:
logger.info("No relevant game data extracted.")
except Exception as e:
logger.error(f"LLM Worker error: {e}")
# Small sleep
await asyncio.sleep(0.1)
async def tui_worker(self):
"""
Worker that handles TUI: Proposal -> Persistence.
"""
logger.info("TUI Worker started.")
while self.is_running:
try:
# Get proposal from queue
result = await self.proposal_queue.get()
logger.info("Proposal received. Launching TUI for confirmation.")
# Launch TUI (Note: Textual's run() is blocking)
# We need to run the TUI in a way that doesn't block the overall event loop
# or we accept that the system pauses for confirmation.
# Given the requirement for "Non-blocking", but TUI is a focus-modal,
# we launch it.
# To integrate Textual with asyncio, we can use its async support.
# However, ConfirmationApp is designed as a standard Textual app.
# Since we want to bridge the asyncio loop, we'll run the TUI.
# Note: In a real high-performance pipeline, we'd use an async TUI
# that updates widgets in real-time. For now, we follow the
# a confirmation screen pattern.
# we will use the run() method, but since we are in an async loop,
# we might need to wrap it or use an async variant.
# For this integration, we'll use the run() method as defined
# in ConfirmationApp, which will take over the terminal.
ConfirmationApp(result).run()
except Exception as e:
logger.error(f"TUI Worker error: {e}")
# Small sleep
await asyncio.sleep(0.1)
async def run(self):
"""
Starts the pipeline workers and the audio listener.
"""
self.is_running = True
self.listener.start()
# Start workers as background tasks
tasks = [
asyncio.create_task(self.stt_worker()),
asyncio.create_task(self.llm_worker()),
asyncio.create_task(self.tui_worker()),
]
try:
# Keep the loop running
while self.is_running:
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
finally:
self.is_running = False
self.listener.stop()
for task in tasks:
task.cancel()
# Wait for tasks to complete
await asyncio.gather(*tasks, return_exceptions=True)
def stop(self):
"""
Stops the pipeline.
"""
self.is_running = False
+1
View File
@@ -0,0 +1 @@
# STT Module
Binary file not shown.
Binary file not shown.
Binary file not shown.
+91
View File
@@ -0,0 +1,91 @@
import asyncio
import logging
import numpy as np
import sounddevice as sd
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AudioListener:
"""
Captures audio from the microphone in chunks and puts them into an asyncio queue.
"""
def __init__(self, sample_rate=16000, chunk_duration=3, device=None, loop=None):
self.sample_rate = sample_rate
self.chunk_duration = chunk_duration
self.device = device
self.loop = loop
self.audio_queue = asyncio.Queue()
self.is_listening = False
def _audio_callback(self, indata, frames, time, status):
"""
This callback is called by sounddevice for every block of audio captured.
"""
if status:
logger.warning(f"SoundDevice status: {status}")
# We capture audio in chunks. sounddevice provides blocks.
# We append these blocks to a buffer until we reach chunk_duration.
self._buffer.append(indata.copy())
# Check if we have enough data for a full chunk
current_duration = len(self._buffer) * frames / self.sample_rate
if current_duration >= self.chunk_duration:
# Concatenate all buffers into one chunk
chunk = np.concatenate(self._buffer, axis=0)
# Trim to exactly chunk_duration to maintain consistency
target_samples = int(self.sample_rate * self.chunk_duration)
chunk = chunk[:target_samples]
# Use call_soon_threadsafe to put the chunk into the asyncio queue from the callback thread
self.loop.call_soon_threadsafe(self.audio_queue.put_nowait, chunk)
self._buffer = []
def start(self):
"""
Starts the audio capture stream.
"""
if self.loop is None:
raise RuntimeError("Event loop must be provided to AudioListener")
self.is_listening = True
self._buffer = []
# Define the block size for the callback
# We'll use a smaller block size (e.g. 0.1s) to keep the callback responsive
block_size = int(self.sample_rate * 0.1)
try:
self.stream = sd.InputStream(
device=self.device,
channels=1,
samplerate=self.sample_rate,
blocksize=block_size,
callback=self._audio_callback,
)
self.stream.start()
logger.info("Audio listener started.")
except Exception as e:
logger.error(f"Failed to start audio listener: {e}")
self.is_listening = False
raise
def stop(self):
"""
Stops the audio capture stream.
"""
if hasattr(self, "stream"):
self.stream.stop()
self.stream.close()
self.is_listening = False
logger.info("Audio listener stopped.")
async def get_chunk(self):
"""
Retrieves a chunk of audio from the queue asynchronously.
"""
return await self.audio_queue.get()
+69
View File
@@ -0,0 +1,69 @@
import logging
from faster_whisper import WhisperModel
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Transcriber:
"""
Converts audio chunks (numpy arrays) into text using faster-whisper.
"""
def __init__(self, model_size="base", device="cpu", compute_type="int8"):
"""
Initializes the faster-whisper model.
Args:
model_size (str): The size of the model to use (e.g., "tiny", "base", "small").
device (str): The device to run the model on ("cpu" or "cuda").
compute_type (str): The compute type to use (e.g., "int8", "float16").
"""
logger.info(
f"Loading faster-whisper model: {model_size} on {device} ({compute_type})..."
)
try:
self.model = WhisperModel(
model_size, device=device, compute_type=compute_type
)
logger.info("Model loaded successfully.")
except Exception as e:
logger.error(f"Failed to load faster-whisper model: {e}")
raise
def transcribe(self, audio_chunk):
"""
Transcribes a single audio chunk.
Args:
audio_chunk (np.ndarray): The audio data as a numpy array.
Returns:
str: The transcribed text.
"""
if audio_chunk is None:
return ""
try:
# faster-whisper expects audio in float32
audio_data = audio_chunk.astype("float32")
# Transcribe the audio
segments, info = self.model.transcribe(audio_data, beam_size=5)
# Combine segments into a single string
text = " ".join([segment.text.strip() for segment in segments])
return text.strip()
except Exception as e:
logger.error(f"Transcription error: {e}")
return ""
def close(self):
"""
Explicitly release model resources if necessary.
"""
# faster-whisper's WhisperModel doesn't have a standard close(),
# but we'll provide this for consistency.
pass
View File
Binary file not shown.
Binary file not shown.
Binary file not shown.
+55
View File
@@ -0,0 +1,55 @@
import asyncio
from typing import List
import typer
from src.llm.models import CharacterStateUpdate, ExtractionResult, LoreUpdate
from src.pipeline.orchestrator import PipelineOrchestrator
from src.ui.tui import ConfirmationApp
app = typer.Typer(help="D&D Helpers CLI")
@app.command()
def run():
"""
Start the main D&D Helpers pipeline.
"""
typer.echo("Starting D&D Helpers pipeline...")
loop = asyncio.get_event_loop()
orchestrator = PipelineOrchestrator(loop=loop)
try:
loop.run_until_complete(orchestrator.run())
except KeyboardInterrupt:
orchestrator.stop()
loop.run_until_complete(asyncio.sleep(0)) # Give it a moment to cleanup
typer.echo("Pipeline stopped.")
@app.command()
def confirm(
lore_text: List[str] = typer.Argument(..., help="Sample lore updates"),
char_name: str = typer.Option("Grog", help="Character name for sample update"),
hp_change: int = typer.Option(0, help="Sample HP change"),
):
"""
Test the confirmation TUI with sample data.
"""
# Mocking an ExtractionResult
lore_updates = [
LoreUpdate(category="NPC", entity_name="Torm", content=text)
for text in lore_text
]
char_updates = [CharacterStateUpdate(character_name=char_name, hp_change=hp_change)]
result = ExtractionResult(lore_updates=lore_updates, character_updates=char_updates)
# Launch the TUI
ConfirmationApp(result).run()
if __name__ == "__main__":
app()
+241
View File
@@ -0,0 +1,241 @@
from typing import List, Union
from textual.app import App, ComposeResult
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import Button, DataTable, Footer, Input, Label, Static
from src.llm.models import CharacterStateUpdate, ExtractionResult, LoreUpdate
from src.persistence.characters import update_character_state
from src.persistence.lore import update_lore
class ConfirmationApp(App):
CSS = """
Screen {
layout: horizontal;
}
#left-pane {
width: 40%;
border: solid 1;
padding: 1;
}
#right-pane {
width: 60%;
border: solid 1;
padding: 1;
layout: vertical;
}
#details-container {
height: auto;
margin-bottom: 1;
}
#actions-container {
height: auto;
layout: horizontal;
align: center middle;
}
#edit-container {
display: none;
height: auto;
layout: vertical;
border: solid 1;
padding: 1;
}
Button {
margin: 0 1;
}
"""
BINDINGS = [
("q", "quit", "Quit"),
]
def __init__(self, result: ExtractionResult):
super().__init__()
self.result = result
self.pending_updates: List[Union[LoreUpdate, CharacterStateUpdate]] = []
# Populate pending updates from result
self.pending_updates.extend(result.lore_updates)
self.pending_updates.extend(result.character_updates)
self.selected_index = -1
def compose(self) -> ComposeResult:
yield Container(
Horizontal(
Vertical(
DataTable(id="update-table"),
id="left-pane",
),
Vertical(
Vertical(
Label("Details:", id="details-label"),
Static("No update selected", id="details-text"),
id="details-container",
),
Vertical(
Label("Edit Value:"),
Input(id="edit-input"),
Button("Save Edit", id="save-edit"),
id="edit-container",
),
Horizontal(
Button("Accept", id="btn-accept"),
Button("Reject", id="btn-reject"),
Button("Edit", id="btn-edit"),
id="actions-container",
),
id="right-pane",
),
),
Footer(),
)
def on_mount(self) -> None:
table = self.query_one("#update-table", DataTable)
table.add_columns("Type", "Target", "Update")
for i, update in enumerate(self.pending_updates):
if isinstance(update, LoreUpdate):
table.add_row(
"Lore", update.entity_name or "General", update.content, key=str(i)
)
elif isinstance(update, CharacterStateUpdate):
change_text = f"HP: {update.hp_change or 0}"
if update.status_effects_added:
change_text += f", Added: {', '.join(update.status_effects_added)}"
if update.status_effects_removed:
change_text += (
f", Removed: {', '.join(update.status_effects_removed)}"
)
table.add_row("Char", update.character_name, change_text, key=str(i))
def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
self.selected_index = event.row
update = self.pending_updates[self.selected_index]
details_text = self.query_one("#details-text", Static)
if isinstance(update, LoreUpdate):
details_text.update(
f"Category: {update.category}\nTarget: {update.entity_name}\nContent: {update.content}"
)
elif isinstance(update, CharacterStateUpdate):
details_text.update(
f"Character: {update.character_name}\nHP Change: {update.hp_change}\nAdded Effects: {update.status_effects_added}\nRemoved Effects: {update.status_effects_removed}"
)
# Reset to detail view
self.query_one("#edit-container", Vertical).styles.display = "none"
self.query_one("#details-container", Vertical).styles.display = "block"
def on_button_clicked(self, event: Button.Clicked) -> None:
if self.selected_index == -1:
return
update = self.pending_updates[self.selected_index]
if event.button.id == "btn-accept":
if isinstance(update, LoreUpdate):
update_lore(update)
elif isinstance(update, CharacterStateUpdate):
update_character_state(update)
self.remove_update(self.selected_index)
elif event.button.id == "btn-reject":
self.remove_update(self.selected_index)
elif event.button.id == "btn-edit":
self.show_edit_mode(update)
elif event.button.id == "save-edit":
self.save_edit(update)
def show_edit_mode(self, update: Union[LoreUpdate, CharacterStateUpdate]) -> None:
edit_input = self.query_one("#edit-input", Input)
if isinstance(update, LoreUpdate):
edit_input.value = update.content
elif isinstance(update, CharacterStateUpdate):
# For simplicity, only allow editing HP change in this TUI
edit_input.value = str(update.hp_change or 0)
self.query_one("#edit-container", Vertical).styles.display = "block"
self.query_one("#details-container", Vertical).styles.display = "none"
def save_edit(self, update: Union[LoreUpdate, CharacterStateUpdate]) -> None:
new_val = self.query_one("#edit-input", Input).value
if isinstance(update, LoreUpdate):
update.content = new_val
elif isinstance(update, CharacterStateUpdate):
try:
update.hp_change = int(new_val)
except ValueError:
# Ignore invalid integer input
pass
# Refresh the table
table = self.query_one("#update-table", DataTable)
# Textual DataTable doesn't have a simple 'update_row', so we clear and refill
# or we can use update_cell.
# Update the table row
if isinstance(update, LoreUpdate):
table.update_cell(self.selected_index, 2, update.content)
elif isinstance(update, CharacterStateUpdate):
change_text = f"HP: {update.hp_change or 0}"
if update.status_effects_added:
change_text += f", Added: {', '.join(update.status_effects_added)}"
if update.status_effects_removed:
change_text += f", Removed: {', '.join(update.status_effects_removed)}"
table.update_cell(self.selected_index, 2, change_text)
self.show_edit_mode(update) # just to refresh the value maybe? No,
# Actually let's go back to detail view
self.query_one("#edit-container", Vertical).styles.display = "none"
self.query_one("#details-container", Vertical).styles.display = "block"
# Update details text
details_text = self.query_one("#details-text", Static)
if isinstance(update, LoreUpdate):
details_text.update(
f"Category: {update.category}\nTarget: {update.entity_name}\nContent: {update.content}"
)
elif isinstance(update, CharacterStateUpdate):
details_text.update(
f"Character: {update.character_name}\nHP Change: {update.hp_change}\nAdded Effects: {update.status_effects_added}\nRemoved Effects: {update.status_effects_removed}"
)
def remove_update(self, index: int) -> None:
# Remove from the pending list
del self.pending_updates[index]
# Clear and refill the table
table = self.query_one("#update-table", DataTable)
table.clear()
for i, update in enumerate(self.pending_updates):
if isinstance(update, LoreUpdate):
table.add_row(
"Lore", update.entity_name or "General", update.content, key=str(i)
)
elif isinstance(update, CharacterStateUpdate):
change_text = f"HP: {update.hp_change or 0}"
if update.status_effects_added:
change_text += f", Added: {', '.join(update.status_effects_added)}"
if update.status_effects_removed:
change_text += (
f", Removed: {', '.join(update.status_effects_removed)}"
)
table.add_row("Char", update.character_name, change_text, key=str(i))
self.selected_index = -1
self.query_one("#details-text", Static).update("No update selected")