Update to Menu and Action

This commit is contained in:
2025-03-27 22:02:16 -04:00
parent 03af4f8077
commit 7443084809
10 changed files with 718 additions and 340 deletions

View File

@ -12,168 +12,34 @@ formatted and visually appealing way.
This class also uses the `prompt_toolkit` library to handle
user input and create an interactive experience.
"""
import asyncio
import logging
from functools import cached_property
from itertools import islice
from typing import Any, Callable
from prompt_toolkit import PromptSession
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.formatted_text import AnyFormattedText
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.shortcuts import confirm
from prompt_toolkit.validation import Validator
from pydantic import BaseModel, Field, field_validator, PrivateAttr
from rich import box
from rich.console import Console
from rich.markdown import Markdown
from rich.table import Table
from action import BaseAction
from bottom_bar import BottomBar
from colors import get_nord_theme
from hook_manager import HookManager
from menu_utils import (CaseInsensitiveDict, InvalidActionError, MenuError,
NotAMenuError, OptionAlreadyExistsError, chunks, run_async)
from one_colors import OneColors
from option import Option
logger = logging.getLogger("menu")
def chunks(iterator, size):
"""Yield successive n-sized chunks from an iterator."""
iterator = iter(iterator)
while True:
chunk = list(islice(iterator, size))
if not chunk:
break
yield chunk
class MenuError(Exception):
"""Custom exception for the Menu class."""
class OptionAlreadyExistsError(MenuError):
"""Exception raised when an option with the same key already exists in the menu."""
class InvalidHookError(MenuError):
"""Exception raised when a hook is not callable."""
class InvalidActionError(MenuError):
"""Exception raised when an action is not callable."""
class NotAMenuError(MenuError):
"""Exception raised when the provided submenu is not an instance of Menu."""
class CaseInsensitiveDict(dict):
"""A case-insensitive dictionary that treats all keys as uppercase."""
def __setitem__(self, key, value):
super().__setitem__(key.upper(), value)
def __getitem__(self, key):
return super().__getitem__(key.upper())
def __contains__(self, key):
return super().__contains__(key.upper())
def get(self, key, default=None):
return super().get(key.upper(), default)
def pop(self, key, default=None):
return super().pop(key.upper(), default)
def update(self, other=None, **kwargs):
if other:
other = {k.upper(): v for k, v in other.items()}
kwargs = {k.upper(): v for k, v in kwargs.items()}
super().update(other, **kwargs)
class Hooks(BaseModel):
"""Class to manage hooks for the menu and options."""
hooks: list[Callable[["Option"], None]] | list[Callable[["Option", Exception], None]] = Field(
default_factory=list
)
@field_validator("hooks", mode="before")
@classmethod
def validate_hooks(cls, hooks):
if hooks is None:
return []
if not all(callable(hook) for hook in hooks):
raise InvalidHookError("All hooks must be callable.")
return hooks
def add_hook(self, hook: Callable[["Option"], None] | Callable[["Option", Exception], None]) -> None:
"""Add a hook to the list."""
if not callable(hook):
raise InvalidHookError("Hook must be a callable.")
if hook not in self.hooks:
self.hooks.append(hook)
def run_hooks(self, *args, **kwargs) -> None:
"""Run all hooks with the given arguments."""
for hook in self.hooks:
try:
hook(*args, **kwargs)
except Exception as hook_error:
logger.exception(f"Hook '{hook.__name__}': {hook_error}")
class Option(BaseModel):
"""Class representing an option in the menu.
Hooks must have the signature:
def hook(option: Option) -> None:
where `option` is the selected option.
Error hooks must have the signature:
def error_hook(option: Option, error: Exception) -> None:
where `option` is the selected option and `error` is the exception raised.
"""
key: str
description: str
action: Callable[[], Any] = lambda: None
color: str = OneColors.WHITE
confirm: bool = False
confirm_message: str = "Are you sure?"
spinner: bool = False
spinner_message: str = "Processing..."
spinner_type: str = "dots"
spinner_style: str = OneColors.CYAN
spinner_kwargs: dict[str, Any] = Field(default_factory=dict)
before_action: Hooks = Field(default_factory=Hooks)
after_action: Hooks = Field(default_factory=Hooks)
on_error: Hooks = Field(default_factory=Hooks)
_start_time: float | None = PrivateAttr(default=None)
_end_time: float | None = PrivateAttr(default=None)
_duration: float | None = PrivateAttr(default=None)
_result: Any | None = PrivateAttr(default=None)
def __str__(self):
return f"Option(key='{self.key}', description='{self.description}')"
def set_result(self, result: Any) -> None:
"""Set the result of the action."""
self._result = result
def get_result(self) -> Any:
"""Get the result of the action."""
return self._result
@field_validator("action")
def validate_action(cls, action):
if not callable(action):
raise InvalidActionError("Action must be a callable.")
return action
class Menu:
"""Class to create a menu with options.
@ -218,21 +84,21 @@ class Menu:
self.title: str | Markdown = title
self.prompt: str | AnyFormattedText = prompt
self.columns: int = columns
self.bottom_bar: str | Callable[[], None] | None = bottom_bar
self.bottom_bar: str | Callable[[], None] | None = bottom_bar or BottomBar(columns=columns)
self.options: dict[str, Option] = CaseInsensitiveDict()
self.back_option: Option = self._get_back_option()
self.console: Console = Console(color_system="truecolor", theme=get_nord_theme())
self.session: PromptSession = self._get_prompt_session()
#self.session: PromptSession = self._get_prompt_session()
self.welcome_message: str | Markdown = welcome_message
self.exit_message: str | Markdown = exit_message
self.before_action: Hooks = Hooks()
self.after_action: Hooks = Hooks()
self.on_error: Hooks = Hooks()
self.hooks: HookManager = HookManager()
self.run_hooks_on_back_option: bool = run_hooks_on_back_option
self.continue_on_error_prompt: bool = continue_on_error_prompt
self._never_confirm: bool = never_confirm
self._verbose: bool = _verbose
self.last_run_option: Option | None = None
self.key_bindings: KeyBindings = KeyBindings()
self.toggles: dict[str, str] = {}
def get_title(self) -> str:
"""Returns the string title of the menu."""
@ -271,7 +137,8 @@ class Menu:
self.session.validator = self._get_validator()
self._invalidate_table_cache()
def _get_prompt_session(self) -> PromptSession:
@cached_property
def session(self) -> PromptSession:
"""Returns the prompt session for the menu."""
return PromptSession(
message=self.prompt,
@ -279,35 +146,50 @@ class Menu:
completer=self._get_completer(),
reserve_space_for_menu=1,
validator=self._get_validator(),
bottom_toolbar=self.bottom_bar,
bottom_toolbar=self.bottom_bar.render,
)
def add_before(self, hook: Callable[["Option"], None]) -> None:
"""Adds a hook to be executed before the action of the menu."""
self.before_action.add_hook(hook)
def add_toggle(self, key: str, label: str, state: bool = False):
if key in self.options or key in self.toggles:
raise ValueError(f"Key '{key}' is already in use.")
def add_after(self, hook: Callable[["Option"], None]) -> None:
"""Adds a hook to be executed after the action of the menu."""
self.after_action.add_hook(hook)
self.toggles[key] = label
self.bottom_bar.add_toggle(label, label, state)
def add_on_error(self, hook: Callable[["Option", Exception], None]) -> None:
"""Adds a hook to be executed on error of the menu."""
self.on_error.add_hook(hook)
@self.key_bindings.add(key)
def _(event):
current = self.bottom_bar._states[label][1]
self.bottom_bar.update_toggle(label, not current)
self.console.print(f"Toggled [{label}] to {'ON' if not current else 'OFF'}")
def add_counter(self, name: str, label: str, current: int, total: int):
self.bottom_bar.add_counter(name, label, current, total)
def update_counter(self, name: str, current: int | None = None, total: int | None = None):
self.bottom_bar.update_counter(name, current=current, total=total)
def update_toggle(self, name: str, state: bool):
self.bottom_bar.update_toggle(name, state)
def debug_hooks(self) -> None:
if not self._verbose:
return
logger.debug(f"Menu-level before hooks: {[hook.__name__ for hook in self.before_action.hooks]}")
logger.debug(f"Menu-level after hooks: {[hook.__name__ for hook in self.after_action.hooks]}")
logger.debug(f"Menu-level error hooks: {[hook.__name__ for hook in self.on_error.hooks]}")
def hook_names(hook_list):
return [hook.__name__ for hook in hook_list]
logger.debug(f"Menu-level before hooks: {hook_names(self.hooks._hooks['before'])}")
logger.debug(f"Menu-level after hooks: {hook_names(self.hooks._hooks['after'])}")
logger.debug(f"Menu-level error hooks: {hook_names(self.hooks._hooks['on_error'])}")
for key, option in self.options.items():
logger.debug(f"[Option '{key}'] before: {[hook.__name__ for hook in option.before_action.hooks]}")
logger.debug(f"[Option '{key}'] after: {[hook.__name__ for hook in option.after_action.hooks]}")
logger.debug(f"[Option '{key}'] error: {[hook.__name__ for hook in option.on_error.hooks]}")
logger.debug(f"[Option '{key}'] before: {hook_names(option.hooks._hooks['before'])}")
logger.debug(f"[Option '{key}'] after: {hook_names(option.hooks._hooks['after'])}")
logger.debug(f"[Option '{key}'] error: {hook_names(option.hooks._hooks['on_error'])}")
def _validate_option_key(self, key: str) -> None:
"""Validates the option key to ensure it is unique."""
if key in self.options or key.upper() == self.back_option.key.upper():
if key.upper() in self.options or key.upper() == self.back_option.key.upper():
raise OptionAlreadyExistsError(f"Option with key '{key}' already exists.")
def update_back_option(
@ -350,7 +232,7 @@ class Menu:
self,
key: str,
description: str,
action: Callable[[], Any],
action: BaseAction | Callable[[], Any],
color: str = OneColors.WHITE,
confirm: bool = False,
confirm_message: str = "Are you sure?",
@ -358,15 +240,14 @@ class Menu:
spinner_message: str = "Processing...",
spinner_type: str = "dots",
spinner_style: str = OneColors.CYAN,
spinner_kwargs: dict[str, Any] = None,
before_hooks: list[Callable[[Option], None]] = None,
after_hooks: list[Callable[[Option], None]] = None,
error_hooks: list[Callable[[Option, Exception], None]] = None,
spinner_kwargs: dict[str, Any] | None = None,
before_hooks: list[Callable] | None = None,
after_hooks: list[Callable] | None = None,
error_hooks: list[Callable] | None = None,
) -> Option:
"""Adds an option to the menu, preventing duplicates."""
spinner_kwargs: dict[str, Any] = spinner_kwargs or {}
self._validate_option_key(key)
if not spinner_kwargs:
spinner_kwargs = {}
option = Option(
key=key,
description=description,
@ -379,10 +260,15 @@ class Menu:
spinner_type=spinner_type,
spinner_style=spinner_style,
spinner_kwargs=spinner_kwargs,
before_action=Hooks(hooks=before_hooks),
after_action=Hooks(hooks=after_hooks),
on_error=Hooks(hooks=error_hooks),
)
for hook in before_hooks or []:
option.hooks.register("before", hook)
for hook in after_hooks or []:
option.hooks.register("after", hook)
for hook in error_hooks or []:
option.hooks.register("on_error", hook)
self.options[key] = option
self._refresh_session()
return option
@ -405,15 +291,20 @@ class Menu:
return self.back_option
return self.options.get(choice)
def _should_hooks_run(self, selected_option: Option) -> bool:
"""Determines if hooks should be run based on the selected option."""
return selected_option != self.back_option or self.run_hooks_on_back_option
def _should_run_action(self, selected_option: Option) -> bool:
if selected_option.confirm and not self._never_confirm:
return confirm(selected_option.confirm_message)
return True
def _create_context(self, selected_option: Option) -> dict[str, Any]:
"""Creates a context dictionary for the selected option."""
return {
"name": selected_option.description,
"option": selected_option,
"args": (),
"kwargs": {},
}
def _run_action_with_spinner(self, option: Option) -> Any:
"""Runs the action of the selected option with a spinner."""
with self.console.status(
@ -422,15 +313,13 @@ class Menu:
spinner_style=option.spinner_style,
**option.spinner_kwargs,
):
return option.action()
return option()
def _handle_action_error(self, selected_option: Option, error: Exception) -> bool:
"""Handles errors that occur during the action of the selected option."""
logger.exception(f"Error executing '{selected_option.description}': {error}")
self.console.print(f"[{OneColors.DARK_RED}]An error occurred while executing "
f"{selected_option.description}:[/] {error}")
selected_option.on_error.run_hooks(selected_option, error)
self.on_error.run_hooks(selected_option, error)
if self.continue_on_error_prompt and not self._never_confirm:
return confirm("An error occurred. Do you wish to continue?")
if self._never_confirm:
@ -442,25 +331,38 @@ class Menu:
choice = self.session.prompt()
selected_option = self.get_option(choice)
self.last_run_option = selected_option
should_hooks_run = self._should_hooks_run(selected_option)
if selected_option == self.back_option:
logger.info(f"🔙 Back selected: exiting {self.get_title()}")
return False
if not self._should_run_action(selected_option):
logger.info(f"[{OneColors.DARK_RED}] {selected_option.description} cancelled.")
return True
context = self._create_context(selected_option)
try:
if should_hooks_run:
self.before_action.run_hooks(selected_option)
selected_option.before_action.run_hooks(selected_option)
run_async(self.hooks.trigger("before", context))
if selected_option.spinner:
result = self._run_action_with_spinner(selected_option)
else:
result = selected_option.action()
result = selected_option()
selected_option.set_result(result)
selected_option.after_action.run_hooks(selected_option)
if should_hooks_run:
self.after_action.run_hooks(selected_option)
context["result"] = result
context["duration"] = selected_option.get_duration()
run_async(self.hooks.trigger("after", context))
except Exception as error:
context["exception"] = error
context["duration"] = selected_option.get_duration()
run_async(self.hooks.trigger("on_error", context))
if "exception" not in context:
logger.info(f"✅ Recovery hook handled error for '{selected_option.description}'")
return True
return self._handle_action_error(selected_option, error)
return selected_option != self.back_option
return True
def run_headless(self, option_key: str, never_confirm: bool | None = None) -> Any:
"""Runs the action of the selected option without displaying the menu."""
@ -470,34 +372,45 @@ class Menu:
selected_option = self.get_option(option_key)
self.last_run_option = selected_option
if not selected_option:
raise MenuError(f"[Headless] Option '{option_key}' not found.")
logger.info("[Headless] Back option selected. Exiting menu.")
return
logger.info(f"[Headless] 🚀 Running: '{selected_option.description}'")
should_hooks_run = self._should_hooks_run(selected_option)
if not self._should_run_action(selected_option):
logger.info(f"[Headless] ⛔ '{selected_option.description}' cancelled.")
raise MenuError(f"[Headless] '{selected_option.description}' cancelled by confirmation.")
context = self._create_context(selected_option)
try:
if should_hooks_run:
self.before_action.run_hooks(selected_option)
selected_option.before_action.run_hooks(selected_option)
run_async(self.hooks.trigger("before", context))
if selected_option.spinner:
result = self._run_action_with_spinner(selected_option)
else:
result = selected_option.action()
result = selected_option()
selected_option.set_result(result)
selected_option.after_action.run_hooks(selected_option)
if should_hooks_run:
self.after_action.run_hooks(selected_option)
context["result"] = result
context["duration"] = selected_option.get_duration()
run_async(self.hooks.trigger("after", context))
logger.info(f"[Headless] ✅ '{selected_option.description}' complete.")
except (KeyboardInterrupt, EOFError):
raise MenuError(f"[Headless] ⚠️ '{selected_option.description}' interrupted by user.")
except Exception as error:
context["exception"] = error
context["duration"] = selected_option.get_duration()
run_async(self.hooks.trigger("on_error", context))
if "exception" not in context:
logger.info(f"[Headless] ✅ Recovery hook handled error for '{selected_option.description}'")
return True
continue_running = self._handle_action_error(selected_option, error)
if not continue_running:
raise MenuError(f"[Headless] ❌ '{selected_option.description}' failed.") from error
return selected_option.get_result()
def run(self) -> None:
@ -517,83 +430,3 @@ class Menu:
logger.info(f"Exiting menu: {self.get_title()}")
if self.exit_message:
self.console.print(self.exit_message)
if __name__ == "__main__":
from rich.traceback import install
from logging_utils import setup_logging
install(show_locals=True)
setup_logging()
def say_hello():
print("Hello!")
def say_goodbye():
print("Goodbye!")
def say_nested():
print("This is a nested menu!")
def my_action():
print("This is my action!")
def long_running_task():
import time
time.sleep(5)
nested_menu = Menu(
Markdown("## Nested Menu", style=OneColors.DARK_YELLOW),
columns=2,
bottom_bar="Menu within a menu",
)
nested_menu.add_option("1", "Say Nested", say_nested, color=OneColors.MAGENTA)
nested_menu.add_before(lambda opt: logger.info(f"Global BEFORE '{opt.description}'"))
nested_menu.add_after(lambda opt: logger.info(f"Global AFTER '{opt.description}'"))
nested_menu.add_option(
"2",
"Test Action",
action=my_action,
before_hooks=[lambda opt: logger.info(f"Option-specific BEFORE '{opt.description}'")],
after_hooks=[lambda opt: logger.info(f"Option-specific AFTER '{opt.description}'")],
)
def bottom_bar():
return (
f"Press Q to quit | Options available: {', '.join([f'[{key}]' for key in menu.options.keys()])}"
)
welcome_message = Markdown("# Welcome to the Menu!")
exit_message = Markdown("# Thank you for using the menu!")
menu = Menu(
Markdown("## Main Menu", style=OneColors.CYAN),
columns=3,
bottom_bar=bottom_bar,
welcome_message=welcome_message,
exit_message=exit_message,
)
menu.add_option("1", "Say Hello", say_hello, color=OneColors.GREEN)
menu.add_option("2", "Say Goodbye", say_goodbye, color=OneColors.LIGHT_RED)
menu.add_option("3", "Do Nothing", lambda: None, color=OneColors.BLUE)
menu.add_submenu("4", "Nested Menu", nested_menu, color=OneColors.MAGENTA)
menu.add_option("5", "Do Nothing", lambda: None, color=OneColors.BLUE)
menu.add_option(
"6",
"Long Running Task",
action=long_running_task,
spinner=True,
spinner_message="Working, please wait...",
spinner_type="moon",
spinner_style=OneColors.GREEN,
spinner_kwargs={"speed": 0.7},
)
menu.update_back_option("Q", "Quit", color=OneColors.DARK_RED)
try:
menu.run()
except EOFError as error:
logger.exception("EOFError: Exiting program.", exc_info=error)
print("Exiting program.")