import asyncio from typing import List, Optional, Union from textual.app import App, ComposeResult from textual.containers import Container, Horizontal, Vertical from textual.widgets import Button, DataTable, Footer, Input, Label, Static from src.llm.models import CharacterStateUpdate, ExtractionResult, LoreUpdate from src.persistence.characters import update_character_state from src.persistence.lore import update_lore class ConfirmationApp(App): CSS = """ Screen { layout: horizontal; } #left-pane { width: 30%; border: solid; padding: 1; } #middle-pane { width: 30%; border: solid; padding: 1; } #right-pane { width: 40%; border: solid; padding: 1; layout: vertical; } #details-container { height: auto; margin-bottom: 1; } #actions-container { height: auto; layout: horizontal; align: center middle; } #edit-container { display: none; height: auto; layout: vertical; border: solid; padding: 1; } Button { margin: 0 1; } """ BINDINGS = [ ("q", "quit", "Quit"), ] def __init__( self, result: Optional[ExtractionResult] = None, proposal_queue: Optional[asyncio.Queue] = None, context_queue: Optional[asyncio.Queue] = None, ): super().__init__() self.result = result self.proposal_queue = proposal_queue self.context_queue = context_queue self.pending_updates: List[Union[LoreUpdate, CharacterStateUpdate]] = [] if result: # Populate pending updates from result self.pending_updates.extend(result.lore_updates) self.pending_updates.extend(result.character_updates) self.selected_index = -1 def compose(self) -> ComposeResult: yield Container( Horizontal( Vertical( DataTable(id="update-table"), id="left-pane", ), Vertical( Static("No context available", id="context-pane"), id="middle-pane", ), Vertical( Vertical( Label("Details:", id="details-label"), Static("No update selected", id="details-text"), id="details-container", ), Vertical( Label("Edit Value:"), Input(id="edit-input"), Button("Save Edit", id="save-edit"), id="edit-container", ), Horizontal( Button("Accept", id="btn-accept"), Button("Reject", id="btn-reject"), Button("Edit", id="btn-edit"), id="actions-container", ), id="right-pane", ), ), Footer(), ) def on_mount(self) -> None: table = self.query_one("#update-table", DataTable) table.cursor_type = "row" table.add_columns("Type", "Target", "Update") for i, update in enumerate(self.pending_updates): if isinstance(update, LoreUpdate): table.add_row( "Lore", update.entity_name or "General", update.content, key=str(i) ) elif isinstance(update, CharacterStateUpdate): change_text = f"HP: {update.hp_change or 0}" if update.status_effects_added: change_text += f", Added: {', '.join(update.status_effects_added)}" if update.status_effects_removed: change_text += ( f", Removed: {', '.join(update.status_effects_removed)}" ) table.add_row("Char", update.character_name, change_text, key=str(i)) if self.pending_updates: self.handle_row_highlight(0) self.query_one("#btn-accept", Button).focus() if self.proposal_queue: self.run_worker(self.poll_proposal_queue, thread=False) if self.context_queue: self.run_worker(self.poll_context_queue, thread=False) async def poll_proposal_queue(self) -> None: """ Background worker that polls the proposal queue for new extraction results. """ while True: try: result = await self.proposal_queue.get() self.add_result(result) # Signal that the item has been processed if hasattr(self.proposal_queue, "task_done"): self.proposal_queue.task_done() except Exception as e: # Log error but keep the worker running self.log(f"Error polling proposal queue: {e}") async def poll_context_queue(self) -> None: """ Background worker that polls the context queue for new RAG updates. """ while True: try: update = await self.context_queue.get() context_pane = self.query_one("#context-pane", Static) # Format the update for display display_text = f"Query: {update.query}\nSource: {update.source}\n\n{update.snippet}" context_pane.update(display_text) if hasattr(self.context_queue, "task_done"): self.context_queue.task_done() except Exception as e: self.log(f"Error polling context queue: {e}") def add_result(self, result: ExtractionResult) -> None: """ Adds results from the LLM processor to the TUI table. """ table = self.query_one("#update-table", DataTable) start_index = len(self.pending_updates) for update in result.lore_updates + result.character_updates: self.pending_updates.append(update) actual_index = len(self.pending_updates) - 1 if isinstance(update, LoreUpdate): table.add_row( "Lore", update.entity_name or "General", update.content, key=str(actual_index), ) elif isinstance(update, CharacterStateUpdate): change_text = f"HP: {update.hp_change or 0}" if update.status_effects_added: change_text += f", Added: {', '.join(update.status_effects_added)}" if update.status_effects_removed: change_text += ( f", Removed: {', '.join(update.status_effects_removed)}" ) table.add_row( "Char", update.character_name, change_text, key=str(actual_index) ) # If the table was previously empty and we added updates, focus the first one. if start_index == 0 and self.pending_updates: self.handle_row_highlight(0) self.query_one("#btn-accept", Button).focus() def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: self.handle_row_highlight(event.cursor_row) def handle_row_highlight(self, row: int) -> None: self.selected_index = row if self.selected_index < 0 or self.selected_index >= len(self.pending_updates): return update = self.pending_updates[self.selected_index] details_text = self.query_one("#details-text", Static) if isinstance(update, LoreUpdate): details_text.update( f"Category: {update.category}\nTarget: {update.entity_name}\nContent: {update.content}" ) elif isinstance(update, CharacterStateUpdate): details_text.update( f"Character: {update.character_name}\nHP Change: {update.hp_change}\nAdded Effects: {update.status_effects_added}\nRemoved Effects: {update.status_effects_removed}" ) # Reset to detail view self.query_one("#edit-container", Vertical).styles.display = "none" self.query_one("#details-container", Vertical).styles.display = "block" def on_button_pressed(self, event: Button.Pressed) -> None: if self.selected_index == -1: return update = self.pending_updates[self.selected_index] if event.button.id == "btn-accept": if isinstance(update, LoreUpdate): update_lore(update) elif isinstance(update, CharacterStateUpdate): update_character_state(update) self.remove_update(self.selected_index) elif event.button.id == "btn-reject": self.remove_update(self.selected_index) elif event.button.id == "btn-edit": self.show_edit_mode(update) elif event.button.id == "save-edit": self.save_edit(update) def show_edit_mode(self, update: Union[LoreUpdate, CharacterStateUpdate]) -> None: edit_input = self.query_one("#edit-input", Input) if isinstance(update, LoreUpdate): edit_input.value = update.content elif isinstance(update, CharacterStateUpdate): # For simplicity, only allow editing HP change in this TUI edit_input.value = str(update.hp_change or 0) self.query_one("#edit-container", Vertical).styles.display = "block" self.query_one("#details-container", Vertical).styles.display = "none" def save_edit(self, update: Union[LoreUpdate, CharacterStateUpdate]) -> None: new_val = self.query_one("#edit-input", Input).value if isinstance(update, LoreUpdate): update.content = new_val elif isinstance(update, CharacterStateUpdate): try: update.hp_change = int(new_val) except ValueError: # Ignore invalid integer input pass # Refresh the table table = self.query_one("#update-table", DataTable) # Textual DataTable doesn't have a simple 'update_row', so we clear and refill # or we can use update_cell. # Update the table row if isinstance(update, LoreUpdate): table.update_cell(self.selected_index, 2, update.content) elif isinstance(update, CharacterStateUpdate): change_text = f"HP: {update.hp_change or 0}" if update.status_effects_added: change_text += f", Added: {', '.join(update.status_effects_added)}" if update.status_effects_removed: change_text += f", Removed: {', '.join(update.status_effects_removed)}" table.update_cell(self.selected_index, 2, change_text) self.show_edit_mode(update) # just to refresh the value maybe? No,’ # Actually let's go back to detail view self.query_one("#edit-container", Vertical).styles.display = "none" self.query_one("#details-container", Vertical).styles.display = "block" # Update details text details_text = self.query_one("#details-text", Static) if isinstance(update, LoreUpdate): details_text.update( f"Category: {update.category}\nTarget: {update.entity_name}\nContent: {update.content}" ) elif isinstance(update, CharacterStateUpdate): details_text.update( f"Character: {update.character_name}\nHP Change: {update.hp_change}\nAdded Effects: {update.status_effects_added}\nRemoved Effects: {update.status_effects_removed}" ) def remove_update(self, index: int) -> None: # Remove from the pending list del self.pending_updates[index] # Clear and refill the table table = self.query_one("#update-table", DataTable) table.clear() for i, update in enumerate(self.pending_updates): if isinstance(update, LoreUpdate): table.add_row( "Lore", update.entity_name or "General", update.content, key=str(i) ) elif isinstance(update, CharacterStateUpdate): change_text = f"HP: {update.hp_change or 0}" if update.status_effects_added: change_text += f", Added: {', '.join(update.status_effects_added)}" if update.status_effects_removed: change_text += ( f", Removed: {', '.join(update.status_effects_removed)}" ) table.add_row("Char", update.character_name, change_text, key=str(i)) if self.pending_updates: self.handle_row_highlight(0) self.query_one("#btn-accept", Button).focus() else: self.selected_index = -1 self.query_one("#details-text", Static).update("All updates processed.")