python_examples/menu/menu.py

433 lines
17 KiB
Python

"""menu.py
This class creates a Menu object that creates a selectable menu
with customizable options and functionality.
It allows for adding options, and their accompanying actions,
and provides a method to display the menu and handle user input.
This class uses the `rich` library to display the menu in a
formatted and visually appealing way.
This class also uses the `prompt_toolkit` library to handle
user input and create an interactive experience.
"""
import asyncio
import logging
from functools import cached_property
from typing import Any, Callable
from prompt_toolkit import PromptSession
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.formatted_text import AnyFormattedText
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.shortcuts import confirm
from prompt_toolkit.validation import Validator
from rich import box
from rich.console import Console
from rich.markdown import Markdown
from rich.table import Table
from action import BaseAction
from bottom_bar import BottomBar
from colors import get_nord_theme
from hook_manager import HookManager
from menu_utils import (CaseInsensitiveDict, InvalidActionError, MenuError,
NotAMenuError, OptionAlreadyExistsError, chunks, run_async)
from one_colors import OneColors
from option import Option
logger = logging.getLogger("menu")
class Menu:
"""Class to create a menu with options.
Hook functions must have the signature:
def hook(option: Option) -> None:
where `option` is the selected option.
Error hook functions must have the signature:
def error_hook(option: Option, error: Exception) -> None:
where `option` is the selected option and `error` is the exception raised.
Hook execution order:
1. Before action hooks of the menu.
2. Before action hooks of the selected option.
3. Action of the selected option.
4. After action hooks of the selected option.
5. After action hooks of the menu.
6. On error hooks of the selected option (if an error occurs).
7. On error hooks of the menu (if an error occurs).
Parameters:
title (str|Markdown): The title of the menu.
columns (int): The number of columns to display the options in.
prompt (AnyFormattedText): The prompt to display when asking for input.
bottom_bar (str|callable|None): The text to display in the bottom bar.
"""
def __init__(
self,
title: str | Markdown = "Menu",
prompt: str | AnyFormattedText = "> ",
columns: int = 3,
bottom_bar: str | Callable[[], None] | None = None,
welcome_message: str | Markdown = "",
exit_message: str | Markdown = "",
run_hooks_on_back_option: bool = False,
continue_on_error_prompt: bool = True,
never_confirm: bool = False,
_verbose: bool = False,
) -> None:
"""Initializes the Menu object."""
self.title: str | Markdown = title
self.prompt: str | AnyFormattedText = prompt
self.columns: int = columns
self.bottom_bar: str | Callable[[], None] | None = bottom_bar or BottomBar(columns=columns)
self.options: dict[str, Option] = CaseInsensitiveDict()
self.back_option: Option = self._get_back_option()
self.console: Console = Console(color_system="truecolor", theme=get_nord_theme())
#self.session: PromptSession = self._get_prompt_session()
self.welcome_message: str | Markdown = welcome_message
self.exit_message: str | Markdown = exit_message
self.hooks: HookManager = HookManager()
self.run_hooks_on_back_option: bool = run_hooks_on_back_option
self.continue_on_error_prompt: bool = continue_on_error_prompt
self._never_confirm: bool = never_confirm
self._verbose: bool = _verbose
self.last_run_option: Option | None = None
self.key_bindings: KeyBindings = KeyBindings()
self.toggles: dict[str, str] = {}
def get_title(self) -> str:
"""Returns the string title of the menu."""
if isinstance(self.title, str):
return self.title
elif isinstance(self.title, Markdown):
return self.title.markup
return self.title
def _get_back_option(self) -> Option:
"""Returns the back option for the menu."""
return Option(key="0", description="Back", color=OneColors.DARK_RED)
def _get_completer(self) -> WordCompleter:
"""Completer to provide auto-completion for the menu options."""
return WordCompleter([*self.options.keys(), self.back_option.key], ignore_case=True)
def _get_validator(self) -> Validator:
"""Validator to check if the input is a valid option."""
valid_keys = {key.upper() for key in self.options.keys()} | {self.back_option.key.upper()}
valid_keys_str = ", ".join(sorted(valid_keys))
return Validator.from_callable(
lambda text: text.upper() in valid_keys,
error_message=f"Invalid option. Valid options are: {valid_keys_str}",
move_cursor_to_end=True,
)
def _invalidate_table_cache(self):
"""Forces the table to be recreated on the next access."""
if hasattr(self, "table"):
del self.table
def _refresh_session(self):
"""Refreshes the prompt session to apply any changes."""
self.session.completer = self._get_completer()
self.session.validator = self._get_validator()
self._invalidate_table_cache()
@cached_property
def session(self) -> PromptSession:
"""Returns the prompt session for the menu."""
return PromptSession(
message=self.prompt,
multiline=False,
completer=self._get_completer(),
reserve_space_for_menu=1,
validator=self._get_validator(),
bottom_toolbar=self.bottom_bar.render,
)
def add_toggle(self, key: str, label: str, state: bool = False):
if key in self.options or key in self.toggles:
raise ValueError(f"Key '{key}' is already in use.")
self.toggles[key] = label
self.bottom_bar.add_toggle(label, label, state)
@self.key_bindings.add(key)
def _(event):
current = self.bottom_bar._states[label][1]
self.bottom_bar.update_toggle(label, not current)
self.console.print(f"Toggled [{label}] to {'ON' if not current else 'OFF'}")
def add_counter(self, name: str, label: str, current: int, total: int):
self.bottom_bar.add_counter(name, label, current, total)
def update_counter(self, name: str, current: int | None = None, total: int | None = None):
self.bottom_bar.update_counter(name, current=current, total=total)
def update_toggle(self, name: str, state: bool):
self.bottom_bar.update_toggle(name, state)
def debug_hooks(self) -> None:
if not self._verbose:
return
def hook_names(hook_list):
return [hook.__name__ for hook in hook_list]
logger.debug(f"Menu-level before hooks: {hook_names(self.hooks._hooks['before'])}")
logger.debug(f"Menu-level after hooks: {hook_names(self.hooks._hooks['after'])}")
logger.debug(f"Menu-level error hooks: {hook_names(self.hooks._hooks['on_error'])}")
for key, option in self.options.items():
logger.debug(f"[Option '{key}'] before: {hook_names(option.hooks._hooks['before'])}")
logger.debug(f"[Option '{key}'] after: {hook_names(option.hooks._hooks['after'])}")
logger.debug(f"[Option '{key}'] error: {hook_names(option.hooks._hooks['on_error'])}")
def _validate_option_key(self, key: str) -> None:
"""Validates the option key to ensure it is unique."""
if key.upper() in self.options or key.upper() == self.back_option.key.upper():
raise OptionAlreadyExistsError(f"Option with key '{key}' already exists.")
def update_back_option(
self,
key: str = "0",
description: str = "Back",
action: Callable[[], Any] = lambda: None,
color: str = OneColors.DARK_RED,
confirm: bool = False,
confirm_message: str = "Are you sure?",
) -> None:
"""Updates the back option of the menu."""
if not callable(action):
raise InvalidActionError("Action must be a callable.")
self._validate_option_key(key)
self.back_option = Option(
key=key,
description=description,
action=action,
color=color,
confirm=confirm,
confirm_message=confirm_message,
)
self._refresh_session()
def add_submenu(self, key: str, description: str, submenu: "Menu", color: str = OneColors.CYAN) -> None:
"""Adds a submenu to the menu."""
if not isinstance(submenu, Menu):
raise NotAMenuError("submenu must be an instance of Menu.")
self._validate_option_key(key)
self.add_option(key, description, submenu.run, color)
self._refresh_session()
def add_options(self, options: list[dict]) -> None:
"""Adds multiple options to the menu."""
for option in options:
self.add_option(**option)
def add_option(
self,
key: str,
description: str,
action: BaseAction | Callable[[], Any],
color: str = OneColors.WHITE,
confirm: bool = False,
confirm_message: str = "Are you sure?",
spinner: bool = False,
spinner_message: str = "Processing...",
spinner_type: str = "dots",
spinner_style: str = OneColors.CYAN,
spinner_kwargs: dict[str, Any] | None = None,
before_hooks: list[Callable] | None = None,
after_hooks: list[Callable] | None = None,
error_hooks: list[Callable] | None = None,
) -> Option:
"""Adds an option to the menu, preventing duplicates."""
spinner_kwargs: dict[str, Any] = spinner_kwargs or {}
self._validate_option_key(key)
option = Option(
key=key,
description=description,
action=action,
color=color,
confirm=confirm,
confirm_message=confirm_message,
spinner=spinner,
spinner_message=spinner_message,
spinner_type=spinner_type,
spinner_style=spinner_style,
spinner_kwargs=spinner_kwargs,
)
for hook in before_hooks or []:
option.hooks.register("before", hook)
for hook in after_hooks or []:
option.hooks.register("after", hook)
for hook in error_hooks or []:
option.hooks.register("on_error", hook)
self.options[key] = option
self._refresh_session()
return option
@cached_property
def table(self) -> Table:
"""Creates a rich table to display the menu options."""
table = Table(title=self.title, show_header=False, box=box.SIMPLE, expand=True)
for chunk in chunks(self.options.items(), self.columns):
row = []
for key, option in chunk:
row.append(f"[{key}] [{option.color}]{option.description}")
table.add_row(*row)
table.add_row(f"[{self.back_option.key}] [{self.back_option.color}]{self.back_option.description}")
return table
def get_option(self, choice: str) -> Option | None:
"""Returns the selected option based on user input."""
if choice.upper() == self.back_option.key.upper():
return self.back_option
return self.options.get(choice)
def _should_run_action(self, selected_option: Option) -> bool:
if selected_option.confirm and not self._never_confirm:
return confirm(selected_option.confirm_message)
return True
def _create_context(self, selected_option: Option) -> dict[str, Any]:
"""Creates a context dictionary for the selected option."""
return {
"name": selected_option.description,
"option": selected_option,
"args": (),
"kwargs": {},
}
def _run_action_with_spinner(self, option: Option) -> Any:
"""Runs the action of the selected option with a spinner."""
with self.console.status(
option.spinner_message,
spinner=option.spinner_type,
spinner_style=option.spinner_style,
**option.spinner_kwargs,
):
return option()
def _handle_action_error(self, selected_option: Option, error: Exception) -> bool:
"""Handles errors that occur during the action of the selected option."""
logger.exception(f"Error executing '{selected_option.description}': {error}")
self.console.print(f"[{OneColors.DARK_RED}]An error occurred while executing "
f"{selected_option.description}:[/] {error}")
if self.continue_on_error_prompt and not self._never_confirm:
return confirm("An error occurred. Do you wish to continue?")
if self._never_confirm:
return True
return False
def process_action(self) -> bool:
"""Processes the action of the selected option."""
choice = self.session.prompt()
selected_option = self.get_option(choice)
self.last_run_option = selected_option
if selected_option == self.back_option:
logger.info(f"🔙 Back selected: exiting {self.get_title()}")
return False
if not self._should_run_action(selected_option):
logger.info(f"[{OneColors.DARK_RED}] {selected_option.description} cancelled.")
return True
context = self._create_context(selected_option)
try:
run_async(self.hooks.trigger("before", context))
if selected_option.spinner:
result = self._run_action_with_spinner(selected_option)
else:
result = selected_option()
selected_option.set_result(result)
context["result"] = result
context["duration"] = selected_option.get_duration()
run_async(self.hooks.trigger("after", context))
except Exception as error:
context["exception"] = error
context["duration"] = selected_option.get_duration()
run_async(self.hooks.trigger("on_error", context))
if "exception" not in context:
logger.info(f"✅ Recovery hook handled error for '{selected_option.description}'")
return True
return self._handle_action_error(selected_option, error)
return True
def run_headless(self, option_key: str, never_confirm: bool | None = None) -> Any:
"""Runs the action of the selected option without displaying the menu."""
self.debug_hooks()
if never_confirm is not None:
self._never_confirm = never_confirm
selected_option = self.get_option(option_key)
self.last_run_option = selected_option
if not selected_option:
logger.info("[Headless] Back option selected. Exiting menu.")
return
logger.info(f"[Headless] 🚀 Running: '{selected_option.description}'")
if not self._should_run_action(selected_option):
raise MenuError(f"[Headless] '{selected_option.description}' cancelled by confirmation.")
context = self._create_context(selected_option)
try:
run_async(self.hooks.trigger("before", context))
if selected_option.spinner:
result = self._run_action_with_spinner(selected_option)
else:
result = selected_option()
selected_option.set_result(result)
context["result"] = result
context["duration"] = selected_option.get_duration()
run_async(self.hooks.trigger("after", context))
logger.info(f"[Headless] ✅ '{selected_option.description}' complete.")
except (KeyboardInterrupt, EOFError):
raise MenuError(f"[Headless] ⚠️ '{selected_option.description}' interrupted by user.")
except Exception as error:
context["exception"] = error
context["duration"] = selected_option.get_duration()
run_async(self.hooks.trigger("on_error", context))
if "exception" not in context:
logger.info(f"[Headless] ✅ Recovery hook handled error for '{selected_option.description}'")
return True
continue_running = self._handle_action_error(selected_option, error)
if not continue_running:
raise MenuError(f"[Headless] ❌ '{selected_option.description}' failed.") from error
return selected_option.get_result()
def run(self) -> None:
"""Runs the menu and handles user input."""
logger.info(f"Running menu: {self.get_title()}")
self.debug_hooks()
if self.welcome_message:
self.console.print(self.welcome_message)
while True:
self.console.print(self.table)
try:
if not self.process_action():
break
except (EOFError, KeyboardInterrupt):
logger.info(f"[{OneColors.DARK_RED}]EOF or KeyboardInterrupt. Exiting menu.")
break
logger.info(f"Exiting menu: {self.get_title()}")
if self.exit_message:
self.console.print(self.exit_message)