This commit is contained in:
Roland Thomas Jr 2025-03-24 22:14:39 -04:00
parent 367a3d9523
commit 03af4f8077
Signed by: roland
GPG Key ID: 7C3C2B085A4C2872
8 changed files with 983 additions and 8 deletions

73
menu/callbacks.py Normal file
View File

@ -0,0 +1,73 @@
import asyncio
import functools
import inspect
import logging
import random
import time
from logging_utils import setup_logging
from rich.console import Console
console = Console()
setup_logging()
logger = logging.getLogger("menu")
def retry(max_retries=3, delay=1, backoff=2, exceptions=(Exception,), logger=None, spinner_text=None):
def decorator(func):
is_coroutine = inspect.iscoroutinefunction(func)
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
retries, current_delay = 0, delay
while retries <= max_retries:
if logger:
logger.debug(f"Retrying {retries + 1}/{max_retries} for '{func.__name__}' after {current_delay}s due to '{exceptions}'.")
try:
with console.status(spinner_text, spinner="dots"):
return await func(*args, **kwargs)
except exceptions as e:
if retries == max_retries:
if logger:
logger.exception(f"❌ Max retries reached for '{func.__name__}': {e}")
raise
if logger:
logger.warning(
f"🔄 Retry {retries + 1}/{max_retries} for '{func.__name__}' after {current_delay}s due to '{e}'."
)
await asyncio.sleep(current_delay)
retries += 1
current_delay *= backoff
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
retries, current_delay = 0, delay
while retries <= max_retries:
if logger:
logger.debug(f"Retrying {retries + 1}/{max_retries} for '{func.__name__}' after {current_delay}s due to '{exceptions}'.")
try:
with console.status(spinner_text, spinner="dots"):
return func(*args, **kwargs)
except exceptions as e:
if retries == max_retries:
if logger:
logger.exception(f"❌ Max retries reached for '{func.__name__}': {e}")
raise
if logger:
logger.warning(
f"🔄 Retry {retries + 1}/{max_retries} for '{func.__name__}' after {current_delay}s due to '{e}'."
)
time.sleep(current_delay)
retries += 1
current_delay *= backoff
return async_wrapper if is_coroutine else sync_wrapper
return decorator
@retry(max_retries=10, delay=1, logger=logger, spinner_text="Trying risky thing...")
def might_fail():
time.sleep(4)
if random.random() < 0.6:
raise ValueError("Simulated failure")
return "🎉 Success!"
result = might_fail()
print(result)

View File

@ -1,5 +1,5 @@
""" """
nord_theme.py colors.py
A Python module that integrates the Nord color palette with the Rich library. A Python module that integrates the Nord color palette with the Rich library.
It defines a metaclass-based NordColors class allowing dynamic attribute lookups It defines a metaclass-based NordColors class allowing dynamic attribute lookups
@ -9,7 +9,7 @@ Theme that customizes Rich's default styles.
Features: Features:
- All core Nord colors (NORD0 through NORD15), plus named aliases (Polar Night, - All core Nord colors (NORD0 through NORD15), plus named aliases (Polar Night,
Snow Storm, Frost, Aurora). Snow Storm, Frost, Aurora).
- A dynamic metaclass (NordMeta) that enables usage of 'NORD1b', 'NORD1_biu', etc. - A dynamic metaclass (NordMeta) that enables usage of 'NORD1b', 'NORD1_biudrs', etc.
to return color + bold/italic/underline/dim/reverse/strike flags for Rich. to return color + bold/italic/underline/dim/reverse/strike flags for Rich.
- A ready-to-use Theme (get_nord_theme) mapping Rich's default styles to Nord colors. - A ready-to-use Theme (get_nord_theme) mapping Rich's default styles to Nord colors.
@ -25,9 +25,9 @@ from rich.theme import Theme
from rich.console import Console from rich.console import Console
class NordMeta(type): class ColorsMeta(type):
""" """
A metaclass that catches attribute lookups like `NORD12bui` or `ORANGE_b` and returns A metaclass that catches attribute lookups like `NORD12buidrs` or `ORANGE_b` and returns
a string combining the base color + bold/italic/underline/dim/reverse/strike flags. a string combining the base color + bold/italic/underline/dim/reverse/strike flags.
""" """
_STYLE_MAP = { _STYLE_MAP = {
@ -38,7 +38,7 @@ class NordMeta(type):
"r": "reverse", "r": "reverse",
"s": "strike", "s": "strike",
} }
_cache = {} _cache: dict = {}
def __getattr__(cls, name: str) -> str: def __getattr__(cls, name: str) -> str:
""" """
@ -57,7 +57,7 @@ class NordMeta(type):
raise AttributeError( raise AttributeError(
f"'{cls.__name__}' has no attribute '{name}'.\n" f"'{cls.__name__}' has no attribute '{name}'.\n"
f"Expected format: BASE[_]?FLAGS, where BASE is uppercase letters/underscores/digits, " f"Expected format: BASE[_]?FLAGS, where BASE is uppercase letters/underscores/digits, "
f"and FLAGS ∈ {{'b', 'i', 'u'}}." f"and FLAGS ∈ {{'b', 'i', 'u', 'd', 'r', 's'}}."
) )
base, suffix = match.groups() base, suffix = match.groups()
@ -104,7 +104,35 @@ class NordMeta(type):
return style_string return style_string
class NordColors(metaclass=NordMeta): class OneColors(metaclass=ColorsMeta):
BLACK = "#282C34"
GUTTER_GREY = "#4B5263"
COMMENT_GREY = "#5C6370"
WHITE = "#ABB2BF"
DARK_RED = "#BE5046"
LIGHT_RED = "#E06C75"
DARK_YELLOW = "#D19A66"
LIGHT_YELLOW = "#E5C07B"
GREEN = "#98C379"
CYAN = "#56B6C2"
BLUE = "#61AFEF"
MAGENTA = "#C678DD"
@classmethod
def as_dict(cls):
"""
Returns a dictionary mapping every NORD* attribute
(e.g. 'NORD0') to its hex code.
"""
return {
attr: getattr(cls, attr)
for attr in dir(cls)
if not callable(getattr(cls, attr)) and
not attr.startswith("__")
}
class NordColors(metaclass=ColorsMeta):
""" """
Defines the Nord color palette as class attributes. Defines the Nord color palette as class attributes.
@ -172,7 +200,8 @@ class NordColors(metaclass=NordMeta):
return { return {
attr: getattr(cls, attr) attr: getattr(cls, attr)
for attr in dir(cls) for attr in dir(cls)
if attr.startswith("NORD") and not callable(getattr(cls, attr)) if attr.startswith("NORD") and
not callable(getattr(cls, attr))
} }
@classmethod @classmethod

168
menu/hooks.py Normal file
View File

@ -0,0 +1,168 @@
import time
import logging
import random
import functools
from menu import Menu, Option
logger = logging.getLogger("menu")
def timing_before_hook(option: Option) -> None:
option._start_time = time.perf_counter()
def timing_after_hook(option: Option) -> None:
option._end_time = time.perf_counter()
option._duration = option._end_time - option._start_time
def timing_error_hook(option: Option, _: Exception) -> None:
option._end_time = time.perf_counter()
option._duration = option._end_time - option._start_time
def log_before(option: Option) -> None:
logger.info(f"🚀 Starting action '{option.description}' (key='{option.key}')")
def log_after(option: Option) -> None:
if option._duration is not None:
logger.info(f"✅ Completed '{option.description}' (key='{option.key}') in {option._duration:.2f}s")
else:
logger.info(f"✅ Completed '{option.description}' (key='{option.key}')")
def log_error(option: Option, error: Exception) -> None:
if option._duration is not None:
logger.error(f"❌ Error '{option.description}' (key='{option.key}') after {option._duration:.2f}s: {error}")
else:
logger.error(f"❌ Error '{option.description}' (key='{option.key}'): {error}")
class CircuitBreakerOpen(Exception):
"""Exception raised when the circuit breaker is open."""
class CircuitBreaker:
def __init__(self, max_failures=3, reset_timeout=10):
self.max_failures = max_failures
self.reset_timeout = reset_timeout
self.failures = 0
self.open_until = None
def before_hook(self, option: Option):
if self.open_until:
if time.time() < self.open_until:
raise CircuitBreakerOpen(f"🔴 Circuit open for '{option.description}' until {time.ctime(self.open_until)}.")
else:
logger.info(f"🟢 Circuit closed again for '{option.description}'.")
self.failures = 0
self.open_until = None
def error_hook(self, option: Option, error: Exception):
self.failures += 1
logger.warning(f"⚠️ CircuitBreaker: '{option.description}' failure {self.failures}/{self.max_failures}.")
if self.failures >= self.max_failures:
self.open_until = time.time() + self.reset_timeout
logger.error(f"🔴 Circuit opened for '{option.description}' until {time.ctime(self.open_until)}.")
def after_hook(self, option: Option):
self.failures = 0
def is_open(self):
return self.open_until is not None and time.time() < self.open_until
def reset(self):
self.failures = 0
self.open_until = None
logger.info("🔄 Circuit reset.")
class RetryHandler:
def __init__(self, max_retries=2, delay=1, backoff=2):
self.max_retries = max_retries
self.delay = delay
self.backoff = backoff
def retry_on_error(self, option: Option, error: Exception):
retries_done = 0
current_delay = self.delay
last_error = error
while retries_done < self.max_retries:
try:
retries_done += 1
logger.info(f"🔄 Retrying '{option.description}' ({retries_done}/{self.max_retries}) in {current_delay}s due to '{error}'...")
time.sleep(current_delay)
result = option.action()
print(result)
option.set_result(result)
logger.info(f"✅ Retry succeeded for '{option.description}' on attempt {retries_done}.")
option.after_action.run_hooks(option)
return
except Exception as retry_error:
logger.warning(f"⚠️ Retry attempt {retries_done} for '{option.description}' failed due to '{retry_error}'.")
last_error = retry_error
current_delay *= self.backoff
logger.exception(f"'{option.description}' failed after {self.max_retries} retries.")
raise last_error
def retry(max_retries=3, delay=1, backoff=2, exceptions=(Exception,), logger=None):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
retries, current_delay = 0, delay
while retries <= max_retries:
try:
return func(*args, **kwargs)
except exceptions as e:
if retries == max_retries:
if logger:
logger.exception(f"❌ Max retries reached for '{func.__name__}': {e}")
raise
if logger:
logger.warning(
f"🔄 Retry {retries + 1}/{max_retries} for '{func.__name__}' after {current_delay}s due to '{e}'."
)
time.sleep(current_delay)
retries += 1
current_delay *= backoff
return wrapper
return decorator
def setup_hooks(menu):
menu.add_before(timing_before_hook)
menu.add_after(timing_after_hook)
menu.add_on_error(timing_error_hook)
menu.add_before(log_before)
menu.add_after(log_after)
menu.add_on_error(log_error)
if __name__ == "__main__":
def risky_task():
if random.random() > 0.1:
time.sleep(1)
raise ValueError("Random failure occurred")
print("Task succeeded!")
breaker = CircuitBreaker(max_failures=2, reset_timeout=10)
retry_handler = RetryHandler(max_retries=30, delay=2, backoff=2)
menu = Menu(never_confirm=True)
menu.add_before(timing_before_hook)
menu.add_after(timing_after_hook)
menu.add_on_error(timing_error_hook)
menu.add_before(log_before)
menu.add_after(log_after)
menu.add_on_error(log_error)
menu.add_option(
key="CR",
description="Retry with CircuitBreaker",
action=risky_task,
before_hooks=[breaker.before_hook],
after_hooks=[breaker.after_hook],
error_hooks=[retry_handler.retry_on_error, breaker.error_hook],
)
menu.run()

40
menu/logging_utils.py Normal file
View File

@ -0,0 +1,40 @@
import logging
from rich.logging import RichHandler
def setup_logging(
log_filename: str = "menu.log",
console_log_level: int = logging.DEBUG,
file_log_level: int = logging.DEBUG,
):
"""Set up logging configuration with separate console and file handlers."""
root_logger = logging.getLogger()
root_logger.setLevel(logging.WARNING)
if root_logger.hasHandlers():
root_logger.handlers.clear()
console_handler = RichHandler(
rich_tracebacks=True,
show_time=True,
show_level=True,
show_path=False,
markup=True,
log_time_format="[%Y-%m-%d %H:%M:%S]",
)
console_handler.setLevel(console_log_level)
file_handler = logging.FileHandler(log_filename)
file_handler.setLevel(file_log_level)
file_formatter = logging.Formatter(
"%(asctime)s [%(name)s] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
file_handler.setFormatter(file_formatter)
root_logger.addHandler(console_handler)
root_logger.addHandler(file_handler)
menu_logger = logging.getLogger("menu")
menu_logger.setLevel(console_log_level)
menu_logger.propagate = True

29
menu/main.py Normal file
View File

@ -0,0 +1,29 @@
import logging
from rich.traceback import install
from logging_utils import setup_logging
from menu import Menu
from hooks import setup_hooks, CircuitBreaker, RetryHandler
from task import risky_task
install(show_locals=True, width=120)
setup_logging()
logger = logging.getLogger("menu")
menu = Menu(title="Main Menu", never_confirm=True)
setup_hooks(menu)
breaker = CircuitBreaker(max_failures=2, reset_timeout=10)
retry_handler = RetryHandler(max_retries=30, delay=2, backoff=2)
menu.add_option(
"1",
"Run Risky Task",
risky_task,
before_hooks=[breaker.before_hook],
after_hooks=[breaker.after_hook],
error_hooks=[retry_handler.retry_on_error, breaker.error_hook],
)
if __name__ == "__main__":
result = menu.run_headless("1")
logger.info(f"Headless execution returned: {result}")

599
menu/menu.py Normal file
View File

@ -0,0 +1,599 @@
"""menu.py
This class creates a Menu object that creates a selectable menu
with customizable options and functionality.
It allows for adding options, and their accompanying actions,
and provides a method to display the menu and handle user input.
This class uses the `rich` library to display the menu in a
formatted and visually appealing way.
This class also uses the `prompt_toolkit` library to handle
user input and create an interactive experience.
"""
import logging
from functools import cached_property
from itertools import islice
from typing import Any, Callable
from prompt_toolkit import PromptSession
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.formatted_text import AnyFormattedText
from prompt_toolkit.shortcuts import confirm
from prompt_toolkit.validation import Validator
from pydantic import BaseModel, Field, field_validator, PrivateAttr
from rich import box
from rich.console import Console
from rich.markdown import Markdown
from rich.table import Table
from colors import get_nord_theme
from one_colors import OneColors
logger = logging.getLogger("menu")
def chunks(iterator, size):
"""Yield successive n-sized chunks from an iterator."""
iterator = iter(iterator)
while True:
chunk = list(islice(iterator, size))
if not chunk:
break
yield chunk
class MenuError(Exception):
"""Custom exception for the Menu class."""
class OptionAlreadyExistsError(MenuError):
"""Exception raised when an option with the same key already exists in the menu."""
class InvalidHookError(MenuError):
"""Exception raised when a hook is not callable."""
class InvalidActionError(MenuError):
"""Exception raised when an action is not callable."""
class NotAMenuError(MenuError):
"""Exception raised when the provided submenu is not an instance of Menu."""
class CaseInsensitiveDict(dict):
"""A case-insensitive dictionary that treats all keys as uppercase."""
def __setitem__(self, key, value):
super().__setitem__(key.upper(), value)
def __getitem__(self, key):
return super().__getitem__(key.upper())
def __contains__(self, key):
return super().__contains__(key.upper())
def get(self, key, default=None):
return super().get(key.upper(), default)
def pop(self, key, default=None):
return super().pop(key.upper(), default)
def update(self, other=None, **kwargs):
if other:
other = {k.upper(): v for k, v in other.items()}
kwargs = {k.upper(): v for k, v in kwargs.items()}
super().update(other, **kwargs)
class Hooks(BaseModel):
"""Class to manage hooks for the menu and options."""
hooks: list[Callable[["Option"], None]] | list[Callable[["Option", Exception], None]] = Field(
default_factory=list
)
@field_validator("hooks", mode="before")
@classmethod
def validate_hooks(cls, hooks):
if hooks is None:
return []
if not all(callable(hook) for hook in hooks):
raise InvalidHookError("All hooks must be callable.")
return hooks
def add_hook(self, hook: Callable[["Option"], None] | Callable[["Option", Exception], None]) -> None:
"""Add a hook to the list."""
if not callable(hook):
raise InvalidHookError("Hook must be a callable.")
if hook not in self.hooks:
self.hooks.append(hook)
def run_hooks(self, *args, **kwargs) -> None:
"""Run all hooks with the given arguments."""
for hook in self.hooks:
try:
hook(*args, **kwargs)
except Exception as hook_error:
logger.exception(f"Hook '{hook.__name__}': {hook_error}")
class Option(BaseModel):
"""Class representing an option in the menu.
Hooks must have the signature:
def hook(option: Option) -> None:
where `option` is the selected option.
Error hooks must have the signature:
def error_hook(option: Option, error: Exception) -> None:
where `option` is the selected option and `error` is the exception raised.
"""
key: str
description: str
action: Callable[[], Any] = lambda: None
color: str = OneColors.WHITE
confirm: bool = False
confirm_message: str = "Are you sure?"
spinner: bool = False
spinner_message: str = "Processing..."
spinner_type: str = "dots"
spinner_style: str = OneColors.CYAN
spinner_kwargs: dict[str, Any] = Field(default_factory=dict)
before_action: Hooks = Field(default_factory=Hooks)
after_action: Hooks = Field(default_factory=Hooks)
on_error: Hooks = Field(default_factory=Hooks)
_start_time: float | None = PrivateAttr(default=None)
_end_time: float | None = PrivateAttr(default=None)
_duration: float | None = PrivateAttr(default=None)
_result: Any | None = PrivateAttr(default=None)
def __str__(self):
return f"Option(key='{self.key}', description='{self.description}')"
def set_result(self, result: Any) -> None:
"""Set the result of the action."""
self._result = result
def get_result(self) -> Any:
"""Get the result of the action."""
return self._result
@field_validator("action")
def validate_action(cls, action):
if not callable(action):
raise InvalidActionError("Action must be a callable.")
return action
class Menu:
"""Class to create a menu with options.
Hook functions must have the signature:
def hook(option: Option) -> None:
where `option` is the selected option.
Error hook functions must have the signature:
def error_hook(option: Option, error: Exception) -> None:
where `option` is the selected option and `error` is the exception raised.
Hook execution order:
1. Before action hooks of the menu.
2. Before action hooks of the selected option.
3. Action of the selected option.
4. After action hooks of the selected option.
5. After action hooks of the menu.
6. On error hooks of the selected option (if an error occurs).
7. On error hooks of the menu (if an error occurs).
Parameters:
title (str|Markdown): The title of the menu.
columns (int): The number of columns to display the options in.
prompt (AnyFormattedText): The prompt to display when asking for input.
bottom_bar (str|callable|None): The text to display in the bottom bar.
"""
def __init__(
self,
title: str | Markdown = "Menu",
prompt: str | AnyFormattedText = "> ",
columns: int = 3,
bottom_bar: str | Callable[[], None] | None = None,
welcome_message: str | Markdown = "",
exit_message: str | Markdown = "",
run_hooks_on_back_option: bool = False,
continue_on_error_prompt: bool = True,
never_confirm: bool = False,
_verbose: bool = False,
) -> None:
"""Initializes the Menu object."""
self.title: str | Markdown = title
self.prompt: str | AnyFormattedText = prompt
self.columns: int = columns
self.bottom_bar: str | Callable[[], None] | None = bottom_bar
self.options: dict[str, Option] = CaseInsensitiveDict()
self.back_option: Option = self._get_back_option()
self.console: Console = Console(color_system="truecolor", theme=get_nord_theme())
self.session: PromptSession = self._get_prompt_session()
self.welcome_message: str | Markdown = welcome_message
self.exit_message: str | Markdown = exit_message
self.before_action: Hooks = Hooks()
self.after_action: Hooks = Hooks()
self.on_error: Hooks = Hooks()
self.run_hooks_on_back_option: bool = run_hooks_on_back_option
self.continue_on_error_prompt: bool = continue_on_error_prompt
self._never_confirm: bool = never_confirm
self._verbose: bool = _verbose
self.last_run_option: Option | None = None
def get_title(self) -> str:
"""Returns the string title of the menu."""
if isinstance(self.title, str):
return self.title
elif isinstance(self.title, Markdown):
return self.title.markup
return self.title
def _get_back_option(self) -> Option:
"""Returns the back option for the menu."""
return Option(key="0", description="Back", color=OneColors.DARK_RED)
def _get_completer(self) -> WordCompleter:
"""Completer to provide auto-completion for the menu options."""
return WordCompleter([*self.options.keys(), self.back_option.key], ignore_case=True)
def _get_validator(self) -> Validator:
"""Validator to check if the input is a valid option."""
valid_keys = {key.upper() for key in self.options.keys()} | {self.back_option.key.upper()}
valid_keys_str = ", ".join(sorted(valid_keys))
return Validator.from_callable(
lambda text: text.upper() in valid_keys,
error_message=f"Invalid option. Valid options are: {valid_keys_str}",
move_cursor_to_end=True,
)
def _invalidate_table_cache(self):
"""Forces the table to be recreated on the next access."""
if hasattr(self, "table"):
del self.table
def _refresh_session(self):
"""Refreshes the prompt session to apply any changes."""
self.session.completer = self._get_completer()
self.session.validator = self._get_validator()
self._invalidate_table_cache()
def _get_prompt_session(self) -> PromptSession:
"""Returns the prompt session for the menu."""
return PromptSession(
message=self.prompt,
multiline=False,
completer=self._get_completer(),
reserve_space_for_menu=1,
validator=self._get_validator(),
bottom_toolbar=self.bottom_bar,
)
def add_before(self, hook: Callable[["Option"], None]) -> None:
"""Adds a hook to be executed before the action of the menu."""
self.before_action.add_hook(hook)
def add_after(self, hook: Callable[["Option"], None]) -> None:
"""Adds a hook to be executed after the action of the menu."""
self.after_action.add_hook(hook)
def add_on_error(self, hook: Callable[["Option", Exception], None]) -> None:
"""Adds a hook to be executed on error of the menu."""
self.on_error.add_hook(hook)
def debug_hooks(self) -> None:
if not self._verbose:
return
logger.debug(f"Menu-level before hooks: {[hook.__name__ for hook in self.before_action.hooks]}")
logger.debug(f"Menu-level after hooks: {[hook.__name__ for hook in self.after_action.hooks]}")
logger.debug(f"Menu-level error hooks: {[hook.__name__ for hook in self.on_error.hooks]}")
for key, option in self.options.items():
logger.debug(f"[Option '{key}'] before: {[hook.__name__ for hook in option.before_action.hooks]}")
logger.debug(f"[Option '{key}'] after: {[hook.__name__ for hook in option.after_action.hooks]}")
logger.debug(f"[Option '{key}'] error: {[hook.__name__ for hook in option.on_error.hooks]}")
def _validate_option_key(self, key: str) -> None:
"""Validates the option key to ensure it is unique."""
if key in self.options or key.upper() == self.back_option.key.upper():
raise OptionAlreadyExistsError(f"Option with key '{key}' already exists.")
def update_back_option(
self,
key: str = "0",
description: str = "Back",
action: Callable[[], Any] = lambda: None,
color: str = OneColors.DARK_RED,
confirm: bool = False,
confirm_message: str = "Are you sure?",
) -> None:
"""Updates the back option of the menu."""
if not callable(action):
raise InvalidActionError("Action must be a callable.")
self._validate_option_key(key)
self.back_option = Option(
key=key,
description=description,
action=action,
color=color,
confirm=confirm,
confirm_message=confirm_message,
)
self._refresh_session()
def add_submenu(self, key: str, description: str, submenu: "Menu", color: str = OneColors.CYAN) -> None:
"""Adds a submenu to the menu."""
if not isinstance(submenu, Menu):
raise NotAMenuError("submenu must be an instance of Menu.")
self._validate_option_key(key)
self.add_option(key, description, submenu.run, color)
self._refresh_session()
def add_options(self, options: list[dict]) -> None:
"""Adds multiple options to the menu."""
for option in options:
self.add_option(**option)
def add_option(
self,
key: str,
description: str,
action: Callable[[], Any],
color: str = OneColors.WHITE,
confirm: bool = False,
confirm_message: str = "Are you sure?",
spinner: bool = False,
spinner_message: str = "Processing...",
spinner_type: str = "dots",
spinner_style: str = OneColors.CYAN,
spinner_kwargs: dict[str, Any] = None,
before_hooks: list[Callable[[Option], None]] = None,
after_hooks: list[Callable[[Option], None]] = None,
error_hooks: list[Callable[[Option, Exception], None]] = None,
) -> Option:
"""Adds an option to the menu, preventing duplicates."""
self._validate_option_key(key)
if not spinner_kwargs:
spinner_kwargs = {}
option = Option(
key=key,
description=description,
action=action,
color=color,
confirm=confirm,
confirm_message=confirm_message,
spinner=spinner,
spinner_message=spinner_message,
spinner_type=spinner_type,
spinner_style=spinner_style,
spinner_kwargs=spinner_kwargs,
before_action=Hooks(hooks=before_hooks),
after_action=Hooks(hooks=after_hooks),
on_error=Hooks(hooks=error_hooks),
)
self.options[key] = option
self._refresh_session()
return option
@cached_property
def table(self) -> Table:
"""Creates a rich table to display the menu options."""
table = Table(title=self.title, show_header=False, box=box.SIMPLE, expand=True)
for chunk in chunks(self.options.items(), self.columns):
row = []
for key, option in chunk:
row.append(f"[{key}] [{option.color}]{option.description}")
table.add_row(*row)
table.add_row(f"[{self.back_option.key}] [{self.back_option.color}]{self.back_option.description}")
return table
def get_option(self, choice: str) -> Option | None:
"""Returns the selected option based on user input."""
if choice.upper() == self.back_option.key.upper():
return self.back_option
return self.options.get(choice)
def _should_hooks_run(self, selected_option: Option) -> bool:
"""Determines if hooks should be run based on the selected option."""
return selected_option != self.back_option or self.run_hooks_on_back_option
def _should_run_action(self, selected_option: Option) -> bool:
if selected_option.confirm and not self._never_confirm:
return confirm(selected_option.confirm_message)
return True
def _run_action_with_spinner(self, option: Option) -> Any:
"""Runs the action of the selected option with a spinner."""
with self.console.status(
option.spinner_message,
spinner=option.spinner_type,
spinner_style=option.spinner_style,
**option.spinner_kwargs,
):
return option.action()
def _handle_action_error(self, selected_option: Option, error: Exception) -> bool:
"""Handles errors that occur during the action of the selected option."""
logger.exception(f"Error executing '{selected_option.description}': {error}")
self.console.print(f"[{OneColors.DARK_RED}]An error occurred while executing "
f"{selected_option.description}:[/] {error}")
selected_option.on_error.run_hooks(selected_option, error)
self.on_error.run_hooks(selected_option, error)
if self.continue_on_error_prompt and not self._never_confirm:
return confirm("An error occurred. Do you wish to continue?")
if self._never_confirm:
return True
return False
def process_action(self) -> bool:
"""Processes the action of the selected option."""
choice = self.session.prompt()
selected_option = self.get_option(choice)
self.last_run_option = selected_option
should_hooks_run = self._should_hooks_run(selected_option)
if not self._should_run_action(selected_option):
logger.info(f"[{OneColors.DARK_RED}] {selected_option.description} cancelled.")
return True
try:
if should_hooks_run:
self.before_action.run_hooks(selected_option)
selected_option.before_action.run_hooks(selected_option)
if selected_option.spinner:
result = self._run_action_with_spinner(selected_option)
else:
result = selected_option.action()
selected_option.set_result(result)
selected_option.after_action.run_hooks(selected_option)
if should_hooks_run:
self.after_action.run_hooks(selected_option)
except Exception as error:
return self._handle_action_error(selected_option, error)
return selected_option != self.back_option
def run_headless(self, option_key: str, never_confirm: bool | None = None) -> Any:
"""Runs the action of the selected option without displaying the menu."""
self.debug_hooks()
if never_confirm is not None:
self._never_confirm = never_confirm
selected_option = self.get_option(option_key)
self.last_run_option = selected_option
if not selected_option:
raise MenuError(f"[Headless] Option '{option_key}' not found.")
logger.info(f"[Headless] 🚀 Running: '{selected_option.description}'")
should_hooks_run = self._should_hooks_run(selected_option)
if not self._should_run_action(selected_option):
logger.info(f"[Headless] ⛔ '{selected_option.description}' cancelled.")
raise MenuError(f"[Headless] '{selected_option.description}' cancelled by confirmation.")
try:
if should_hooks_run:
self.before_action.run_hooks(selected_option)
selected_option.before_action.run_hooks(selected_option)
if selected_option.spinner:
result = self._run_action_with_spinner(selected_option)
else:
result = selected_option.action()
selected_option.set_result(result)
selected_option.after_action.run_hooks(selected_option)
if should_hooks_run:
self.after_action.run_hooks(selected_option)
logger.info(f"[Headless] ✅ '{selected_option.description}' complete.")
except (KeyboardInterrupt, EOFError):
raise MenuError(f"[Headless] ⚠️ '{selected_option.description}' interrupted by user.")
except Exception as error:
continue_running = self._handle_action_error(selected_option, error)
if not continue_running:
raise MenuError(f"[Headless] ❌ '{selected_option.description}' failed.") from error
return selected_option.get_result()
def run(self) -> None:
"""Runs the menu and handles user input."""
logger.info(f"Running menu: {self.get_title()}")
self.debug_hooks()
if self.welcome_message:
self.console.print(self.welcome_message)
while True:
self.console.print(self.table)
try:
if not self.process_action():
break
except (EOFError, KeyboardInterrupt):
logger.info(f"[{OneColors.DARK_RED}]EOF or KeyboardInterrupt. Exiting menu.")
break
logger.info(f"Exiting menu: {self.get_title()}")
if self.exit_message:
self.console.print(self.exit_message)
if __name__ == "__main__":
from rich.traceback import install
from logging_utils import setup_logging
install(show_locals=True)
setup_logging()
def say_hello():
print("Hello!")
def say_goodbye():
print("Goodbye!")
def say_nested():
print("This is a nested menu!")
def my_action():
print("This is my action!")
def long_running_task():
import time
time.sleep(5)
nested_menu = Menu(
Markdown("## Nested Menu", style=OneColors.DARK_YELLOW),
columns=2,
bottom_bar="Menu within a menu",
)
nested_menu.add_option("1", "Say Nested", say_nested, color=OneColors.MAGENTA)
nested_menu.add_before(lambda opt: logger.info(f"Global BEFORE '{opt.description}'"))
nested_menu.add_after(lambda opt: logger.info(f"Global AFTER '{opt.description}'"))
nested_menu.add_option(
"2",
"Test Action",
action=my_action,
before_hooks=[lambda opt: logger.info(f"Option-specific BEFORE '{opt.description}'")],
after_hooks=[lambda opt: logger.info(f"Option-specific AFTER '{opt.description}'")],
)
def bottom_bar():
return (
f"Press Q to quit | Options available: {', '.join([f'[{key}]' for key in menu.options.keys()])}"
)
welcome_message = Markdown("# Welcome to the Menu!")
exit_message = Markdown("# Thank you for using the menu!")
menu = Menu(
Markdown("## Main Menu", style=OneColors.CYAN),
columns=3,
bottom_bar=bottom_bar,
welcome_message=welcome_message,
exit_message=exit_message,
)
menu.add_option("1", "Say Hello", say_hello, color=OneColors.GREEN)
menu.add_option("2", "Say Goodbye", say_goodbye, color=OneColors.LIGHT_RED)
menu.add_option("3", "Do Nothing", lambda: None, color=OneColors.BLUE)
menu.add_submenu("4", "Nested Menu", nested_menu, color=OneColors.MAGENTA)
menu.add_option("5", "Do Nothing", lambda: None, color=OneColors.BLUE)
menu.add_option(
"6",
"Long Running Task",
action=long_running_task,
spinner=True,
spinner_message="Working, please wait...",
spinner_type="moon",
spinner_style=OneColors.GREEN,
spinner_kwargs={"speed": 0.7},
)
menu.update_back_option("Q", "Quit", color=OneColors.DARK_RED)
try:
menu.run()
except EOFError as error:
logger.exception("EOFError: Exiting program.", exc_info=error)
print("Exiting program.")

28
menu/one_colors.py Normal file
View File

@ -0,0 +1,28 @@
from colors import ColorsMeta
class OneColors(metaclass=ColorsMeta):
BLACK = "#282C34"
GUTTER_GREY = "#4B5263"
COMMENT_GREY = "#5C6370"
WHITE = "#ABB2BF"
DARK_RED = "#BE5046"
LIGHT_RED = "#E06C75"
DARK_YELLOW = "#D19A66"
LIGHT_YELLOW = "#E5C07B"
GREEN = "#98C379"
CYAN = "#56B6C2"
BLUE = "#61AFEF"
MAGENTA = "#C678DD"
@classmethod
def as_dict(cls):
"""
Returns a dictionary mapping every NORD* attribute
(e.g. 'NORD0') to its hex code.
"""
return {
attr: getattr(cls, attr)
for attr in dir(cls)
if not callable(getattr(cls, attr)) and
not attr.startswith("__")
}

9
menu/task.py Normal file
View File

@ -0,0 +1,9 @@
import random
import time
def risky_task() -> str:
if random.random() > 0.25:
time.sleep(1)
raise ValueError("Random failure occurred")
return "Task succeeded!"