Compare commits

..

15 Commits

Author SHA1 Message Date
7443084809 Update to Menu and Action 2025-03-27 22:02:16 -04:00
03af4f8077 Add menu 2025-03-24 22:14:39 -04:00
367a3d9523 Add maxis_loading 2025-03-09 18:24:12 -04:00
1742cec93b Update nord.py with metaclass 2025-03-09 16:58:30 -04:00
62d576b7fc Formatting 2025-03-03 00:06:41 -05:00
760ce82c47 Add nord.py 2025-03-03 00:04:11 -05:00
cf55dd1446 Add serialization, html 2024-06-13 19:03:57 -04:00
15e8953712 item_manager_table.py 2024-06-11 21:45:22 -04:00
68e9879b32 Update completer 2024-06-10 22:28:15 -04:00
060e94326a Add remove_item_by_name to item_list_manager.py 2024-06-09 21:55:42 -04:00
679ce2eddf Add item_list_manager.py 2024-06-09 21:31:28 -04:00
ac5c0fca17 Add item_manager.py 2024-06-09 19:11:26 -04:00
c608c48dfb Add comments 2024-06-09 15:52:30 -04:00
6c50bd267e formatting 2024-06-09 15:17:09 -04:00
7b872484e9 Add get_issues, logging 2024-06-09 15:08:36 -04:00
20 changed files with 2727 additions and 115 deletions

21
cli/get_issues.py Normal file
View File

@ -0,0 +1,21 @@
from jira import JIRA
from rich.console import Console
from jql_utils import JQLPrompt, load_config
config = load_config()
console = Console(color_system="truecolor")
jira = JIRA(server=config["server"], basic_auth=(config["username"], config["token"]))
jql_prompt = JQLPrompt(jira)
issues = jql_prompt.prompt()
console.print("[] Getting more issues... 🚀", style="#A3BE8C bold")
more_issues = jql_prompt.multi_prompt()
if issues or more_issues:
console.print("[] Saving logs... 🚀", style="#A3BE8C bold")
jql_prompt.save_log()
console.print("Exiting... 🚀", style="bold green")

97
cli/item_list_manager.py Normal file
View File

@ -0,0 +1,97 @@
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from collections import defaultdict
@dataclass
class Item:
id: str
name: str
json_data_list: List[Dict[str, Any]] = field(default_factory=list)
def add_json_data(self, json_data: Dict[str, Any]):
if json_data not in self.json_data_list:
self.json_data_list.append(json_data)
class ItemManager:
def __init__(self):
self.items_by_id: Dict[str, Item] = {}
self.items_by_name: Dict[str, Item] = {}
self.ordered_items: List[Item] = []
def add_item(self, json_obj: Dict[str, Any]):
item_id = json_obj['id']
item_name = json_obj['name']
if item_id not in self.items_by_id:
item = Item(id=item_id, name=item_name)
self.items_by_id[item_id] = item
self.items_by_name[item_name] = item
self.ordered_items.append(item)
else:
item = self.items_by_id[item_id]
item.add_json_data(json_obj['data'])
print(f"Item added or updated: {item}")
def add_items_from_json_list(self, json_list: List[Dict[str, Any]]):
for json_obj in json_list:
self.add_item(json_obj)
def remove_item_by_id(self, item_id: str):
if item_id in self.items_by_id:
item_to_remove = self.items_by_id.pop(item_id)
self.ordered_items = [item for item in self.ordered_items if item.id != item_id]
if item_to_remove.name in self.items_by_name:
del self.items_by_name[item_to_remove.name]
print(f"Item removed: {item_to_remove}")
else:
print(f"Item with ID {item_id} not found.")
def remove_item_by_name(self, name: str):
if name in self.items_by_name:
item_to_remove = self.items_by_name.pop(name)
self.ordered_items = [item for item in self.ordered_items if item.name != name]
if item_to_remove.id in self.items_by_id:
del self.items_by_id[item_to_remove.id]
print(f"Item removed: {item_to_remove}")
else:
print(f"Item with name {name} not found.")
def search_by_id(self, item_id: str) -> Optional[Item]:
return self.items_by_id.get(item_id)
def search_by_name(self, name: str) -> Optional[Item]:
return self.items_by_name.get(name)
def get_all_items(self) -> List[Item]:
return self.ordered_items
# Example usage:
item_manager = ItemManager()
# Initial JSON list
json_list_1 = [
{"id": "1", "name": "Item1", "data": {"key1": "value1"}},
{"id": "2", "name": "Item2", "data": {"key2": "value2"}},
{"id": "1", "name": "Item1", "data": {"key1": "value1a"}}
]
# Add items from the first list
item_manager.add_items_from_json_list(json_list_1)
item_manager.add_items_from_json_list(json_list_1)
# Fetch more JSON objects
json_list_2 = [
{"id": "3", "name": "Item3", "data": {"key3": "value3"}},
{"id": "4", "name": "Item4", "data": {"key4": "value4"}}
]
# Add new items if they are not already in the manager
item_manager.add_items_from_json_list(json_list_2)
item_manager.add_items_from_json_list(json_list_2)
# Remove an item by ID
item_manager.remove_item_by_id("1")
# Get all items
all_items = item_manager.get_all_items()
for item in all_items:
print(item)

75
cli/item_manager.py Normal file
View File

@ -0,0 +1,75 @@
#!/usr/bin/env python3
""" This is a simple example of a class that manages items. """
from dataclasses import dataclass
from typing import List, Dict, Any
@dataclass
class Item:
""" This is a simple class that represents an item. """
id: str
name: str
json_data: Dict[str, Any]
class ItemManager:
""" This is a simple class that manages items. """
def __init__(self):
self.items_by_id: Dict[str, Item] = {}
self.ordered_items: List[Item] = []
def add_item(self, item: Item):
if item.id not in self.items_by_id:
self.items_by_id[item.id] = item
self.ordered_items.append(item)
print(f"Item added: {item}")
else:
print(f"Item with ID {item.id} already exists.")
def add_items_from_json_list(self, json_list: List[Dict[str, Any]]):
for json_obj in json_list:
item = Item(id=json_obj['id'], name=json_obj['name'], json_data=json_obj)
self.add_item(item)
def remove_item_by_id(self, item_id: str):
if item_id in self.items_by_id:
item_to_remove = self.items_by_id.pop(item_id)
self.ordered_items = [item for item in self.ordered_items if item.id != item_id]
print(f"Item removed: {item_to_remove}")
else:
print(f"Item with ID {item_id} not found.")
def fetch_more_items_and_add(self, json_list: List[Dict[str, Any]]):
for json_obj in json_list:
item = Item(id=json_obj['id'], name=json_obj['name'], json_data=json_obj)
if item.id not in self.items_by_id:
self.add_item(item)
def get_all_items(self) -> List[Item]:
return self.ordered_items
def main():
item_manager = ItemManager()
json_list_1 = [
{"id": "1", "name": "Item1", "data": {"key1": "value1"}},
{"id": "2", "name": "Item2", "data": {"key2": "value2"}},
{"id": "1", "name": "Item1", "data": {"key1": "value1"}}
]
item_manager.add_items_from_json_list(json_list_1)
json_list_2 = [
{"id": "3", "name": "Item3", "data": {"key3": "value3"}},
{"id": "4", "name": "Item4", "data": {"key4": "value4"}}
]
item_manager.fetch_more_items_and_add(json_list_2)
item_manager.remove_item_by_id("2")
all_items = item_manager.get_all_items()
for item in all_items:
print(item)
if __name__ == "__main__":
main()

166
cli/item_manager_table.py Normal file
View File

@ -0,0 +1,166 @@
import pickle
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from rich.console import Console
from rich.text import Text
from rich.table import Table
@dataclass
class Item:
id: str
name: str
json_data_list: List[Dict[str, Any]] = field(default_factory=list)
data_type: str = 'new'
def add_json_data(self, json_data: Dict[str, Any], data_type: str):
if json_data not in self.json_data_list:
self.json_data_list.append(json_data)
self.data_type = data_type
class ItemManager:
def __init__(self):
self.items_by_id: Dict[str, Item] = {}
self.items_by_name: Dict[str, Item] = {}
self.ordered_items: List[Item] = []
self.new_count: int = 0
self.old_count: int = 0
self.total_count: int = 0
def add_item(self, json_obj: Dict[str, Any], data_type: str = 'new'):
item_id = json_obj['id']
item_name = json_obj['name']
if item_id not in self.items_by_id:
item = Item(id=item_id, name=item_name)
self.items_by_id[item_id] = item
self.items_by_name[item_name] = item
self.ordered_items.append(item)
self.total_count += 1
if data_type == 'new':
self.new_count += 1
else:
self.old_count += 1
else:
item = self.items_by_id[item_id]
if item.data_type == 'new' and data_type == 'old':
self.new_count -= 1
self.old_count += 1
elif item.data_type == 'old' and data_type == 'new':
self.old_count -= 1
self.new_count += 1
item.add_json_data(json_obj['data'], data_type)
print(f"Item added or updated: {item}")
def add_items_from_json_list(self, json_list: List[Dict[str, Any]], data_type: str = 'new'):
for json_obj in json_list:
self.add_item(json_obj, data_type)
def remove_item_by_id(self, item_id: str):
if item_id in self.items_by_id:
item_to_remove = self.items_by_id.pop(item_id)
self.ordered_items = [item for item in self.ordered_items if item.id != item_id]
if item_to_remove.name in self.items_by_name:
del self.items_by_name[item_to_remove.name]
self.total_count -= 1
if item_to_remove.data_type == 'new':
self.new_count -= 1
else:
self.old_count -= 1
print(f"Item removed: {item_to_remove}")
else:
print(f"Item with ID {item_id} not found.")
def remove_item_by_name(self, name: str):
if name in self.items_by_name:
item_to_remove = self.items_by_name.pop(name)
self.ordered_items = [item for item in self.ordered_items if item.name != name]
if item_to_remove.id in self.items_by_id:
del self.items_by_id[item_to_remove.id]
self.total_count -= 1
if item_to_remove.data_type == 'new':
self.new_count -= 1
else:
self.old_count -= 1
print(f"Item removed: {item_to_remove}")
else:
print(f"Item with name {name} not found.")
def search_by_name(self, name: str) -> Optional[Item]:
return self.items_by_name.get(name)
def search_by_id(self, item_id: str) -> Optional[Item]:
return self.items_by_id.get(item_id)
def get_all_items(self) -> List[Item]:
return self.ordered_items
def get_counts(self) -> Dict[str, int]:
return {
'new': self.new_count,
'old': self.old_count,
'total': self.total_count
}
def get_item_tuples(self) -> List[tuple]:
tuples = []
for item in self.ordered_items:
color = "#D08770" if item.data_type == "new" else "#B48EAD"
tuples.append((item.name, color))
return tuples
def print_items_as_table(self):
console = Console()
table = Table(title="Items")
table.add_column("Item Name", justify="left", style="bold")
for item_name, color in self.get_item_tuples():
text = Text(item_name)
text.stylize(color)
table.add_row(text)
console.print(table)
def save_to_file(self, filename: str):
with open(filename, 'wb') as f:
pickle.dump(self, f)
print(f"ItemManager saved to {filename}")
@staticmethod
def load_from_file(filename: str) -> 'ItemManager':
with open(filename, 'rb') as f:
item_manager = pickle.load(f)
print(f"ItemManager loaded from {filename}")
return item_manager
@classmethod
def from_json_list(cls, json_list: List[Dict[str, Any]]):
item_manager = cls()
item_manager.add_items_from_json_list(json_list)
return item_manager
item_manager = ItemManager()
json_list_1 = [
{"id": "1", "name": "Item1", "data": {"key1": "value1"}},
{"id": "2", "name": "Item2", "data": {"key2": "value2"}},
{"id": "1", "name": "Item1", "data": {"key1": "value1a"}}
]
item_manager.add_items_from_json_list(json_list_1, data_type='new')
json_list_2 = [
{"id": "3", "name": "Item3", "data": {"key3": "value3"}},
{"id": "4", "name": "Item4", "data": {"key4": "value4"}}
]
item_manager.add_items_from_json_list(json_list_2, data_type='old')
item_manager.print_items_as_table()
# Save to file
item_manager.save_to_file('item_manager.pkl')
# Load from file
loaded_item_manager = ItemManager.load_from_file('item_manager.pkl')
# Print loaded items as table
loaded_item_manager.print_items_as_table()

View File

@ -1,70 +0,0 @@
from prompt_toolkit.lexers import Lexer
from prompt_toolkit.styles import Style
from prompt_toolkit.document import Document
from prompt_toolkit.formatted_text import StyleAndTextTuples
from typing import Callable
class JQLLexer(Lexer):
def lex_document(self, document: Document) -> Callable[[int], StyleAndTextTuples]:
text = document.text
tokens = []
keywords = {
"AND", "OR", "NOT", "IN", "ORDER BY", "ASC", "DESC",
"IS", "NULL", "TRUE", "FALSE", "EMPTY"
}
operators = {
"=", "!", ">", "<", ">=", "<=", "~", "!~", "!="
}
punctuations = {"(", ")", ",", ":", " "}
pos = 0
word = ''
while pos < len(text):
char = text[pos]
if char.isalpha():
word += char
else:
if word:
if word.upper() in keywords:
tokens.append(('class:keyword', word))
else:
tokens.append(('class:name', word))
word = ''
if char in operators:
tokens.append(('class:operator', char))
elif char in punctuations:
tokens.append(('class:punctuation', char))
elif char.isspace():
tokens.append(('class:text', char))
else:
tokens.append(('class:error', char))
pos += 1
if word:
if word.upper() in keywords:
tokens.append(('class:keyword', word))
else:
tokens.append(('class:name', word))
return lambda i: tokens
# Example usage
from prompt_toolkit import PromptSession
custom_style = Style.from_dict({
'keyword': '#ff0066 bold',
'operator': '#00ff00',
'name': '#0000ff',
'punctuation': '#00ffff',
'text': '#ffffff',
'error': '#ff0000 bold',
})
session = PromptSession(lexer=JQLLexer(), style=custom_style)
text = session.prompt('Enter JQL: ')
print(f'You entered: {text}')

View File

@ -1,22 +1,28 @@
from itertools import chain
import json
from typing import Dict, List
from datetime import datetime
from typing import Dict, List, Optional, Iterable
from jira import JIRA
from jira.exceptions import JIRAError
from jira.resources import Issue
from prompt_toolkit import PromptSession
from prompt_toolkit.completion import Completer, Completion
from prompt_toolkit.lexers import PygmentsLexer
from prompt_toolkit.shortcuts import confirm
from prompt_toolkit.styles import Style
from prompt_toolkit.validation import ValidationError, Validator
from prompt_toolkit.shortcuts import confirm
from prompt_toolkit.formatted_text import HTML, merge_formatted_text
from pygments.lexer import RegexLexer
from pygments.token import Error, Keyword, Name, Operator, Punctuation, String, Text, Whitespace, _TokenType
from pygments.token import (Error, Keyword, Name,
Operator, Punctuation,
String, Text, Whitespace, _TokenType)
from rich.console import Console
from rich.text import Text as RichText
class JQLLexer(RegexLexer):
""" JQL Lexer for Pygments. """
name = "JQL"
aliases = ["jql"]
filenames = ["*.jql"]
@ -34,14 +40,14 @@ class JQLLexer(RegexLexer):
),
(
r"(?i)\b(?:A|AND|ARE|AS|AT|BE|BUT|BY|FOR|IF|INTO|IT|NO|OF|ON|OR|S|SUCH|T|THAT|THE|THEIR|THEN|"
r"THERE|THESE|THEY|THIS|TO|WILL|WITH)\b",
r"THERE|THESE|THEY|THIS|TO|WILL|WITH|ORDER BY|ASC|DESC)\b",
Keyword,
),
(
r"(?i)\b(?:Assignee|affectedVersion|Attachments|Category|Comment|Component|Created|createdDate|"
r"Creator|Description|Due|duedate|Filter|fixVersion|issuekey|issuetype|issueLinkType|Labels|"
r"lastViewed|Priority|Project|Reporter|Resolved|Sprint|Status|statusCategory|Summary|Text|"
r"timespent|Voter|Watcher)\b",
r"(?i)\b(?:assignee|affectedVersion|attachments|category|comment|component|created|createdDate|"
r"creator|cescription|due|duedate|filter|fixVersion|issuekey|issuetype|issueLinkType|labels|"
r"lastViewed|priority|project|reporter|resolved|Sprint|status|statusCategory|summary|text|"
r"timespent|updated|updatedDate|voter|watcher|watchers)\b",
Name.Attribute,
),
(r"(?i)(=|!=|<|>|<=|>=|~|!~|IN|NOT IN|IS|IS NOT|WAS|WAS IN|WAS NOT IN|WAS NOT)", Operator),
@ -61,6 +67,9 @@ class JQLLexer(RegexLexer):
class JQLStyles:
""" JQL Styles for Pygments.
Based on the Nord color palette: https://www.nordtheme.com/docs/colors-and-palettes
"""
nord: Style = Style.from_dict(
{
"pygments.whitespace": "#FFFFFF",
@ -95,6 +104,7 @@ class JQLStyles:
"Attributes": "#B48EAD",
"Operators": "#EBCB8B bold",
"Projects": "#D08770",
"Order": "#BF616A bold",
}
@ -131,6 +141,7 @@ completions: Dict[str, List[str]] = {
"TO",
"WILL",
"WITH",
"ORDER BY"
],
"Functions": [
"issueHistory",
@ -156,6 +167,7 @@ completions: Dict[str, List[str]] = {
"assignee",
"affectedVersion",
"attachments",
"category",
"comment",
"component",
"created",
@ -178,6 +190,7 @@ completions: Dict[str, List[str]] = {
"summary",
"text",
"timespent",
"updated",
"voter",
"watcher",
],
@ -211,17 +224,24 @@ completions: Dict[str, List[str]] = {
"ASTRAL",
"PHOTON",
],
"Order": [
"ASC",
"DESC",
],
}
class JQLPrinter:
""" JQL Printer to print JQL queries with syntax highlighting. """
def __init__(self, console: Console):
self.console = console
def print(self, text: str):
""" Print JQL query with syntax highlighting. """
self.console.print(self.pygments_to_rich(text), end="")
def pygments_to_rich(self, text):
""" Convert Pygments tokens to RichText. """
tokens = list(JQLLexer().get_tokens(text))
rich_text = RichText()
for token_type, value in tokens:
@ -231,10 +251,12 @@ class JQLPrinter:
class JQLValidator(Validator):
""" JQL Validator to validate JQL queries. """
def __init__(self, jira_instance):
self.jira = jira_instance
def validate(self, document):
""" Validate JQL query. """
text = document.text
if text.lower() == "b" or text.lower() == "exit":
return
@ -248,24 +270,59 @@ class JQLValidator(Validator):
class JQLCompleter(Completer):
"""Custom JQL completer to categorize and color completions."""
def __init__(self, categorized_completions):
def __init__(self, categorized_completions: Dict[str, List[str]]):
self.categorized_completions = categorized_completions
def get_completions(self, document, complete_event):
text = document.get_word_before_cursor().lower()
def get_completions(self, document, complete_event) -> Iterable[Completion]:
text_before_cursor = document.text_before_cursor.lower().strip()
words = text_before_cursor.split()
if document.text_before_cursor and document.text_before_cursor[-1].isspace():
return self._get_next_word_completions(words, text_before_cursor)
else:
return self._get_current_word_completions(words[-1] if words else '')
def _get_next_word_completions(self, words: List[str], text_before_cursor: str) -> Iterable[Completion]:
if not words:
return chain(self._get_category_completions("Attributes"),
self._get_category_completions("Functions"))
last_word = words[-1]
if last_word in ["and", "or"]:
return chain(self._get_category_completions("Functions"),
self._get_category_completions("Attributes"))
if last_word in self.categorized_completions.get("Operators", []):
return self._get_category_completions("Projects")
if last_word in ["order", "by"]:
return self._get_category_completions("Attributes")
if "order by" in text_before_cursor:
return self._get_category_completions("Order")
if last_word in self.categorized_completions.get("Attributes", []):
return self._get_category_completions("Operators")
return []
def _get_current_word_completions(self, word: str) -> Iterable[Completion]:
for category, words in self.categorized_completions.items():
for word in words:
if text in word.lower():
display_text = f"{word}"
yield Completion(
word,
start_position=-len(text),
display=display_text,
for completion_word in words:
if word in completion_word.lower():
yield Completion(completion_word,
start_position=-len(word),
display=completion_word,
display_meta=category,
style=f"fg: #D8DEE9 bg: {JQLStyles.completion.get(category, 'white')}",
selected_style=f"fg: {JQLStyles.completion.get(category, 'white')} bg: #D8DEE9",
)
def _get_category_completions(self, category: str) -> Iterable[Completion]:
for word in self.categorized_completions.get(category, []):
yield Completion(word, display=word, display_meta=category)
def load_config():
with open("config.json") as json_file:
@ -281,21 +338,30 @@ def load_config():
class JQLPrompt:
def __init__(self, jira, console):
self.jira = jira
self.console = console
self.session = self.create_jql_prompt_session()
self.jql = JQLPrinter(console)
self.query_count = 0
""" JQL Prompt to interact with JIRA using JQL queries. """
def __init__(self, jira):
self.jira: JIRA = jira
self.console: Console = Console(color_system="truecolor", record=True)
self.session: PromptSession = self.create_jql_prompt_session()
self.jql: JQLPrinter = JQLPrinter(self.console)
self.query_count: int = 0
self.issue_count: int = 0
self.total_issue_count: int = 0
self.issues: List[Issue] = []
def get_query_count(self):
space = self.console.width // 4
query_count_str = f"Query count: {self.query_count}"
plain_text = f"{query_count_str:^{space}}{query_count_str:^{space}}{query_count_str:^{space}}{query_count_str:^{space}}"
return [("bg:#2E3440 #D8DEE9", plain_text)]
space = self.console.width // 3
query_count_str = f"Query Count: {self.query_count}" if self.query_count else ""
query_count_html = HTML(f"<b><style fg='#2E3440' bg='#88C0D0'>{query_count_str:^{space}}</style></b>")
issue_count_str = f"Issues Added: {self.issue_count}" if self.issue_count else ""
issue_count_html = HTML(f"<b><style fg='#2E3440' bg='#B48EAD'>{issue_count_str:^{space}}</style></b>")
total_issue_count_str = f"Total Issues: {self.total_issue_count}" if self.total_issue_count else ""
total_issue_count_html = HTML(f"<b><style fg='#2E3440' bg='#D8DEE9'>{total_issue_count_str:^{space}}</style></b>")
next_line = HTML(f"<style fg='#2E3440' bg='#88C0D0'>\n{'multiline toolbar':<{self.console.width}}</style>")
return merge_formatted_text([query_count_html, issue_count_html, total_issue_count_html, next_line])
def create_jql_prompt_session(self):
completer = JQLCompleter(completions)
completer: JQLCompleter = JQLCompleter(completions)
return PromptSession(
message=[("#B48EAD", "JQL \u276f ")],
lexer=PygmentsLexer(JQLLexer),
@ -310,8 +376,13 @@ class JQLPrompt:
validate_while_typing=False,
)
def get_input(self):
user_input = self.session.prompt()
def prompt(self) -> Optional[List[Issue]]:
""" Prompt the user for a JQL query.
Returns:
Optional[List[Issue]]: List of JIRA issues.
"""
user_input: str = self.session.prompt()
self.issue_count = 0
if not user_input:
do_empty_query = confirm(
[("#EBCB8B bold", "[?] "), ("#D8DEE9 bold", "Do you want to perform an empty query?")],
@ -328,21 +399,44 @@ class JQLPrompt:
self.query_count += 1
self.console.print(
RichText.assemble(
(f"[+] Found {len(issues)} issues from JQL query: ", "green bold"),
(f"[+] Found {len(issues)} issues from JQL query: ", "#A3BE8C bold"),
self.jql.pygments_to_rich(user_input),
),
end="",
)
for issue in issues:
self.console.print(f"{issue.key}: {issue.fields.summary}")
return issues
self.console.print("[!] No issues found.", style="#BF616A bold")
def run(self):
def multi_prompt(self) -> Optional[List[Issue]]:
""" Prompt the user for multiple JQL queries.
Returns:
Optional[List[Issue]]: List of JIRA issues.
"""
self.issues = []
while True:
try:
self.get_input()
issues = self.prompt()
if issues:
issues = [issue for issue in issues if issue not in self.issues]
self.issues.extend(issues)
self.issue_count += len(issues)
self.total_issue_count += len(issues)
self.console.print(f"[] Total issues: {len(self.issues)}", style="#D8DEE9")
get_more = confirm([("#A3BE8C", "[?] Get more issues?")], suffix=[("#81A1C1 bold", " (Y/n) ")])
if not get_more:
break
except (EOFError, KeyboardInterrupt):
break
self.console.print("Goodbye!", style="#BF616A bold")
if self.issues:
self.console.print(f"[+] Issues added: {self.total_issue_count}", style="#A3BE8C bold")
return self.issues
self.console.print("[!] No issues added.", style="#BF616A bold")
def save_log(self):
""" Save the console log to a file. """
log_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
with open(f"jql_{log_time}.txt", "w") as log_file:
log_file.write(self.console.export_text())
"""
@ -362,10 +456,9 @@ class JQLPrompt:
def main():
config = load_config()
console = Console(color_system="truecolor")
jira = JIRA(server=config["server"], basic_auth=(config["username"], config["token"]))
prompt = JQLPrompt(jira, console)
prompt.run()
prompt = JQLPrompt(jira)
prompt.multi_prompt()
if __name__ == "__main__":

408
maxis_loading Normal file
View File

@ -0,0 +1,408 @@
Abolishing Pedestrian Posturing
Abstracting Loading Procedures
Activating Deviance Threshold
Activating Hotel Staff
Activating Story Arc
Adapting Behavioral Model
Adding Hidden Agendas
Adjusting Acceptable Apparel
Adjusting Bell Curves
Adjusting Emotional Weights
Anesthetizing Industrial Areas
Aggregating Need Agents
Aligning Covariance Matrices
Alphabetizing Books
Amplifying Sun to '11'
Analyzing Adolescent Angst
Analyzing Axe Trajectories
Applying Feng Shui Shaders
Applying Lampshade Headwear
Applying Theatre Soda Layer
Appointing Appealing Appurtenances
Ascending Maslow's Hierarchy
Assembling Playground
Assembling Shower Stalls
Asserting Packed Exemplars
Assessing Loam Particle Sizes
Assigning Mimic Propagation
Atomizing Atomic Particles
Attempting to Lock Back-Buffer
Augmenting Assets
Augmenting Occupational Conduits
Baking Bread for Toasters
Balancing Domestic Coefficients
Besmirching Reputations
Binding Sapling Root System
Binding Trace Enchantments
Blurring Reality Lines
Borrowing Something Blue
Boxing BILLY Bookcases
Branching Family Trees
Breaking Down Restorable Cars
Breeding Fauna
Bribing The Orangutans
Buffing Splines for Reticulation
Building Bedroom Displays
Building Boring Bedrooms? As If!
Building Data Trees
Building High Ceilings
Bureacritizing Bureaucracies
Burning Calories
Cabalizing NPC Controls
Caffeinating Student Body
Calculating Exchange Rate
Calculating Inverse Probability Matrices
Calculating Lifetime Aspirations
Calculating Llama Expectoration Trajectory
Calculating Maximum Velocity
Calculating Money Supply
Calculating Native Restlessness
Calculating Snowball Trajectories
Calculating Vincent's Wealth
Calibrating Blue Skies
Calibrating Canine Customization
Calibrating Fame Indicant
Calibrating Personality Matrix
Calling Psychic Phone Pals
Canceling Un-cancelable Actions
Capacitating Genetic Modifiers
Captivating Conspiracy Theorists
Capturing Youthful Exuberance
Catalyzing Chemicals
Catching Cat Burglars
Ceiling Fan Rotation = dL/dT
Charging Ozone Layer
Checkering Flags
Chlorinating Car Pools
Clearing Shipping Lanes
Cluttering Closets
Coalescing Cloud Formations
Cohorting Exemplars
Collecting Meteor Particles
Compiling Reticulated Splines
Compiling Riley's Wardrobe
Composing Melodic Euphony
Compositing Vampiric Complexions
Compounding Inert Tessellations
Compressing Fish Files
Computing Optimal Bin Packing
Concatenating Sub-Contractors
Concatenating Vertex Nodes
Configuring Lemony Squeezation
Configuring Studio Operations
Constructing Clothes Hangers
Containing Existential Buffer
Cooling Down Refrigerators
Crash-Proofing Parties
Creating Handmade Lampshades
Crenellating Crenellations
Cultivating Quality and Class
Dampening Stray Generators
De-chlorophyllizing Leaves
De-inviting Don Lothario
De-wrinkling Worry-Free Clothing
Debarking Ark Ramp
Debunching Unionized Commercial Services
Deciding What Message to Display Next
Decomposing Singular Values
Decrementing Alice's Funds
Decrementing Feline Life-Count
Decrementing Tectonic Plates
Deleting Ferry Routes
Delineating Mask Dynamics
Depixelating Inner Mountain Surface Back Faces
Depositing Slush Funds
Desalinizing Snorkels
Destabilizing Economic Indicators
Destabilizing Orbital Payloads
Determining Rent Guidelines
Determining Width of Blast Fronts
Deunionizing Bulldozers
Developing Delicious Designs
Dicing Models
Diluting Livestock Nutrition Variables
Disgruntling Employees
Distilling Doggie Dynamics
Distressing Jeans
Downloading Satellite Terrain Data
Downloading Weather Data
Dragon-proofing Dressers
Dumbing Down Doofuses
Eliminating Would-be Chicanery
Enabling Lot Commercialization
Enforcing Storyline
Enhancing Crown Reflectivity
Enlisting Elite Forces
Ensuring Transplanar Synergy
Eschewing Everyday Aesthetics
Establishing Gift Registry
Estimating Volcanic Activity
Examining Tiles from All Zooms and Angles
Exposing Flash Variables to Streak System
Extracting Resources
Extrapolating Empire Eigenvectors
Extruding Mesh Terrain
Fabricating Imaginary Infrastructure
Faceting Precious Gems
Factoring Fairy Frolicking Frequencies
Factoring Hobby Enthusiasm
Factoring Pay Scale
Falsifying Faux Finishes
Fashioning Late Arrivals
Fiercely Reticulating Splines
Filling in the Blanks
Fixing Election Outcome Matrix
Flavorizing Side-Dishes
Flood-Filling Ground Water
Flushing Pipe Network
Formulating Fitting Rooms
Gathering Particle Sources
Generating Compatible Roommates
Generating Gothic Glamour
Generating Intrigue
Generating Jobs
Generating Population Model
Generating Sand Grains
Generating Schmoozing Algorithm
Gesticulating Mimes
Gleefully Stacking Inventories
Going Apartment Hunting
Graduating Scholars
Graphing Whale Migration
Grooming Grooms
Growing Greener Gardens
Happy 14th Birthday Reticulated Splines!
Hiding Garden Gnomes
Hiding Willio Webnet Mask
Homogenizing Interest Anatomy
Hybridizing Plant Material
Hydrating Harvestables
Hyperactivating Children
Igniting Pilot Lights
Implementing Impeachment Routine
Importing Entertainment Talent
Importing Personality Anchors
Increasing Accuracy of RCI Simulators
Increasing Magmafacation
Increasing Water Plant Population
Individualizing Snowflakes
Infuriating Furious Bits
Initializing Dastardly Schemes
Initializing Forth-Rallying Protocol
Initializing My Sim Tracking Mechanism
Initializing Operant Construct
Initializing Rhinoceros Breeding Timetable
Initializing Robotic Click-Path AI
Initializing Secret Societies
Inserting Chaos Generator
Inserting Extension Algorithms
Inserting Sublimated Messages
Integrating Curves
Integrating Illumination Form Factors
Integrating Population Graphs
Intensifying Hawaiian Prints
Interpreting Family Values
Inventing Internets
Inverting Career Ladder
Invigorating Dull Habitations
Iterating Cellular Automata
Iterating Chaos Array
Lacing Football Cleats
Launching SimSat 9000
Lecturing Errant Subsystems
Leveling Playing Fields
Like, Totally Reticulating Splines, Dude
Limiting Litterbox Loads
Loading "First Batch" Metrics
Loading "Vroom" Sounds
Loading School Spirit Algorithm
Locating Misplaced Calculations
Making a Little Bit of Magic
Making a Mess
Making Manic Mansions
Making Many Mini Wrenches
Making Owners Look Like Pets
Making Pets Look Like Owners
Making Stephen Loyal
Managing Managers' Managers
Manipulating Modal Memory
Mapping Influence Attributes
Mapping The Llama Genome
Matching Walls and Floors
Maximizing Social Network
Meditating Modifiers
Mingling
Mitigating Time-Stream Discontinuities
Mixing Genetic Pool
Modeling Marquetry
Modeling Object Components
Monitoring Moody Minors
Mopping Occupant Leaks
Navigating Stormy Waters
Neutralizing Shuriken Oxidization
Normalizing Power
Normalizing Social Network
Obfuscating Quigley Matrix
Optimizing Baking Temperature
Originating Ocean Currents
Over-Waxing Banisters
Overconstraining Dirty Industry Calculations
Partitioning City Grid Singularities
Partitioning Prose
Partitioning Social Network
Perfecting Playground Pieces
Performing A Sound Check
Perturbing Matrices
Pixalating Nude Patch
Polarizing Image Conduits
Polishing Water Highlights
Populating Empyreal Entities
Populating Lot Templates
Populating Yards with Bugs and Birds
Pre-fluffing Pillows
Pre-Inking Simoleon Plates
Predicating Predestined Paths
Predicting Pagoda Peaks
Predicting Puddle Prevalence
Predicting Weather Unpredictability
Prelaminating Drywall Inventory
Preparing a Tasty Grilled Cheese Sandwich
Preparing Bacon for Homeward Transportation
Preparing Captive Simulators
Preparing for Pops and Locks
Preparing Perfect Plumbing
Preparing Personal Spaces
Preparing Sprites for Random Walks
Preparing Vacation Days
Pressurizing Fruit Punch Barrel Hydraulics
Priming Mascot Mischief Coefficients
Prioritizing Landmarks
Projecting Law Enforcement Pastry Intake
Proscribing Plebeian Palates
Putting Down Toilet Seats
Randomizing Inhabitant Characteristics
Rasterizing Reputation Algorithms
Rasterizing Rodent Residences
Re-Activating Story Arc
Re-Inverting Career Ladder
Re-Re-Re-Re-Re-Reticulating Splines
Readjusting Emotional Weights
Readying Relaxation Receptors
Realigning Alternate Time Frames
Recomputing Mammal Matrix
Reconfiguring Genetic Algorithms
Reconfiguring User Mental Processes
Recruiting Snooty Food Judges
Recycling Hex Decimals
Redefining Family Values
Redistributing Resources
Rehearsing Dinner
Reinforcing Story Lines
Relaxing Splines
Remodeling Spline Reticulator
Removing Road Network Speed Bumps
Removing Texture Gradients
Removing Vehicle Avoidance Behavior
Renewing Urban Combinatorics
Replacing Wheel Bearings
Requisitioning Alumni Donations
Resolving GUID Conflict
Restocking Sim Inventories
Reticulated Splines for Sale: §2000
Reticulating 3-Dimensional Splines
Reticulating 4-D Splines
Reticulating Dog Show Object Splines
Reticulating Golden Splines
Reticulating Graduated Splines
Reticulating Ninja Splines
Reticulating Splines
Reticulating Splines Again
Reticulating Splines in the Zone
Reticulating Splines One Last Time
Reticulating Story Splines
Reticulating Underwater Splines
Reticulating Unreticulated Splines
Reticulator of Splines Reticulating
Retracting Phong Shader
Retrieving from Back Store
Reverse Engineering Image Consultant
Reverse-Engineering Party Scores
Rezoning Residual Residents
Roof = Roof(1/3*pi*r^2*h)
Routing Neural Network Infanstructure
Scattering Rhino Food Sources
Scheduling Copious Catnaps
Scolding Splines for Reticulating
Scrubbing Terrain
Searching for Llamas
Securing Online Grades Database
Seeding Architecture Simulation Parameters
Sequencing Cinematic Specifiers
Sequencing Particles
Setting Advisor Moods
Setting Inner Deity Indicators
Setting Universal Physical Constants
Severing Civilization Connections
Shampooing Dirty Rugs
Simmering Swedish Meatballs
Simulating Program Execution
Simulating Sparkling Surfaces
Some Spline Reticulating Required
Sonically Enhancing Occupant-Free Timber
Space Ponies: Achieved
Sparking Imaginations
Spawning Sights to See
Speculating Stock Market Indices
Spinning New Tunes
Splatting Transforms
Spooling IKEA Awesomeness
Spreading Rumors
Still Reticulating Splines
Stocking Clearance Racks
Stocking Ponds
Stocking Stylish Sinks
Stooping and Scooping
Stopping To Smell The Flowers
Storing Solar Energy
Stratifying Ground Layers
Strengthening Award Foundations
Stress-Testing POÄNG Chairs
Stuffing Genies Into Bottles
Sub-Sampling Water Data
Submerging Bedroom Furniture
Supplying Self-Serve Furniture Area
Sweetening Sixteens
Synthesizing Gravity
Synthesizing Natural Selection
Synthesizing Wavelets
Tabulating Spell Effectors
Tabulating Traits
Tailoring Trendy Threads
Taking Countertops for Granite
Teasing Teenage Hair-dos
Telling Splines to Reticulate More Quietly
Testing Test Subjects
Testing Underworld Telecommunications
Texture-Compositing Teddy Bears
Threading Fabric Compositors
Threading Sewing Needles
Time-Compressing Simulator Clock
Timing Temperature Transference
Training Team Mascots
Training Tour Guides
Transmitting Message Bottles
Turning On Turn-Ons
Twisting Spiral Staircases
Unable to Reveal Current Activity
Unexpectedly Reticulating Splines
Unfolding Foldy Chairs
Unfolding Helix Packet
Unloading Loading Screens
Updating Hotel Registry
Updating Vacancy Request Hotline
Upgrading Gadgets
Upholstering Sofas and Loveseats
Weathering Buildings
Wrangling All Wreckage
Writing Scrolling Startup String Text
Zeroing Crime Network

218
menu/action.py Normal file
View File

@ -0,0 +1,218 @@
"""action.py
Any Action or Option is callable and supports the signature:
result = thing(*args, **kwargs)
This guarantees:
- Hook lifecycle (before/after/error/teardown)
- Timing
- Consistent return values
"""
from __future__ import annotations
import asyncio
import logging
import time
import inspect
from abc import ABC, abstractmethod
from typing import Optional
from hook_manager import HookManager
from menu_utils import TimingMixin, run_async
logger = logging.getLogger("menu")
class BaseAction(ABC, TimingMixin):
"""Base class for actions. They are the building blocks of the menu.
Actions can be simple functions or more complex actions like
`ChainedAction` or `ActionGroup`. They can also be run independently
or as part of a menu."""
def __init__(self, name: str, hooks: Optional[HookManager] = None):
self.name = name
self.hooks = hooks or HookManager()
self.start_time: float | None = None
self.end_time: float | None = None
self._duration: float | None = None
def __call__(self, *args, **kwargs):
context = {
"name": self.name,
"duration": None,
"args": args,
"kwargs": kwargs,
"action": self
}
self._start_timer()
try:
run_async(self.hooks.trigger("before", context))
result = self._run(*args, **kwargs)
context["result"] = result
return result
except Exception as error:
context["exception"] = error
run_async(self.hooks.trigger("on_error", context))
if "exception" not in context:
logger.info(f"✅ Recovery hook handled error for Action '{self.name}'")
return context.get("result")
raise
finally:
self._stop_timer()
context["duration"] = self.get_duration()
if "exception" not in context:
run_async(self.hooks.trigger("after", context))
run_async(self.hooks.trigger("on_teardown", context))
@abstractmethod
def _run(self, *args, **kwargs):
raise NotImplementedError("_run must be implemented by subclasses")
async def run_async(self, *args, **kwargs):
if inspect.iscoroutinefunction(self._run):
return await self._run(*args, **kwargs)
return await asyncio.to_thread(self.__call__, *args, **kwargs)
def __await__(self):
return self.run_async().__await__()
@abstractmethod
def dry_run(self):
raise NotImplementedError("dry_run must be implemented by subclasses")
def __str__(self):
return f"<{self.__class__.__name__} '{self.name}'>"
def __repr__(self):
return str(self)
class Action(BaseAction):
def __init__(self, name: str, fn, rollback=None, hooks=None):
super().__init__(name, hooks)
self.fn = fn
self.rollback = rollback
def _run(self, *args, **kwargs):
if inspect.iscoroutinefunction(self.fn):
return asyncio.run(self.fn(*args, **kwargs))
return self.fn(*args, **kwargs)
def dry_run(self):
print(f"[DRY RUN] Would run: {self.name}")
class ChainedAction(BaseAction):
def __init__(self, name: str, actions: list[BaseAction], hooks=None):
super().__init__(name, hooks)
self.actions = actions
def _run(self, *args, **kwargs):
rollback_stack = []
for action in self.actions:
try:
result = action(*args, **kwargs)
rollback_stack.append(action)
except Exception:
self._rollback(rollback_stack, *args, **kwargs)
raise
return None
def dry_run(self):
print(f"[DRY RUN] ChainedAction '{self.name}' with steps:")
for action in self.actions:
action.dry_run()
def _rollback(self, rollback_stack, *args, **kwargs):
for action in reversed(rollback_stack):
if hasattr(action, "rollback") and action.rollback:
try:
print(f"↩️ Rolling back {action.name}")
action.rollback(*args, **kwargs)
except Exception as e:
print(f"⚠️ Rollback failed for {action.name}: {e}")
class ActionGroup(BaseAction):
def __init__(self, name: str, actions: list[BaseAction], hooks=None):
super().__init__(name, hooks)
self.actions = actions
self.results = []
self.errors = []
def _run(self, *args, **kwargs):
asyncio.run(self._run_async(*args, **kwargs))
def dry_run(self):
print(f"[DRY RUN] ActionGroup '{self.name}' (parallel execution):")
for action in self.actions:
action.dry_run()
async def _run_async(self, *args, **kwargs):
async def run(action):
try:
result = await asyncio.to_thread(action, *args, **kwargs)
self.results.append((action.name, result))
except Exception as e:
self.errors.append((action.name, e))
await self.hooks.trigger("before", name=self.name)
await asyncio.gather(*[run(a) for a in self.actions])
if self.errors:
await self.hooks.trigger("on_error", name=self.name, errors=self.errors)
else:
await self.hooks.trigger("after", name=self.name, results=self.results)
await self.hooks.trigger("on_teardown", name=self.name)
# if __name__ == "__main__":
# # Example usage
# def build(): print("Build!")
# def test(): print("Test!")
# def deploy(): print("Deploy!")
# pipeline = ChainedAction("CI/CD", [
# Action("Build", build),
# Action("Test", test),
# ActionGroup("Deploy Parallel", [
# Action("Deploy A", deploy),
# Action("Deploy B", deploy)
# ])
# ])
# pipeline()
# Sample functions
def sync_hello():
time.sleep(1)
return "Hello from sync function"
async def async_hello():
await asyncio.sleep(1)
return "Hello from async function"
# Example usage
async def main():
sync_action = Action("sync_hello", sync_hello)
async_action = Action("async_hello", async_hello)
print("⏳ Awaiting sync action...")
result1 = await sync_action
print("", result1)
print("⏳ Awaiting async action...")
result2 = await async_action
print("", result2)
print(f"⏱️ sync took {sync_action.get_duration():.2f}s")
print(f"⏱️ async took {async_action.get_duration():.2f}s")
if __name__ == "__main__":
asyncio.run(main())

59
menu/bottom_bar.py Normal file
View File

@ -0,0 +1,59 @@
from prompt_toolkit.formatted_text import HTML, merge_formatted_text
from typing import Callable, Literal, Optional
from rich.console import Console
class BottomBar:
def __init__(self, columns: int = 3):
self.columns = columns
self.console = Console()
self._items: list[Callable[[], HTML]] = []
self._named_items: dict[str, Callable[[], HTML]] = {}
self._states: dict[str, any] = {}
def get_space(self) -> str:
return self.console.width // self.columns
def add_static(self, name: str, text: str) -> None:
def render():
return HTML(f"<style fg='#D8DEE9'>{text:^{self.get_space()}}</style>")
self._add_named(name, render)
def add_counter(self, name: str, label: str, current: int, total: int) -> None:
self._states[name] = (label, current, total)
def render():
l, c, t = self._states[name]
text = f"{l}: {c}/{t}"
return HTML(f"<style fg='#A3BE8C'>{text:^{self.get_space()}}</style>")
self._add_named(name, render)
def add_toggle(self, name: str, label: str, state: bool) -> None:
self._states[name] = (label, state)
def render():
l, s = self._states[name]
color = '#A3BE8C' if s else '#BF616A'
status = "ON" if s else "OFF"
text = f"{l}: {status}"
return HTML(f"<style fg='{color}'>{text:^{self.get_space()}}</style>")
self._add_named(name, render)
def update_toggle(self, name: str, state: bool) -> None:
if name in self._states:
label, _ = self._states[name]
self._states[name] = (label, state)
def update_counter(self, name: str, current: Optional[int] = None, total: Optional[int] = None) -> None:
if name in self._states:
label, c, t = self._states[name]
self._states[name] = (label, current if current is not None else c, total if total is not None else t)
def _add_named(self, name: str, render_fn: Callable[[], HTML]) -> None:
self._named_items[name] = render_fn
self._items = list(self._named_items.values())
def render(self):
return merge_formatted_text([fn() for fn in self._items])

71
menu/callbacks.py Normal file
View File

@ -0,0 +1,71 @@
import asyncio
import functools
import inspect
import logging
import random
import time
from logging_utils import setup_logging
from rich.console import Console
console = Console()
setup_logging()
logger = logging.getLogger("menu")
def retry(max_retries=3, delay=1, backoff=2, exceptions=(Exception,)):
def decorator(func):
is_coroutine = inspect.iscoroutinefunction(func)
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
retries, current_delay = 0, delay
while retries <= max_retries:
if logger:
logger.debug(f"Retrying {retries + 1}/{max_retries} for '{func.__name__}' after {current_delay}s due to '{exceptions}'.")
try:
return await func(*args, **kwargs)
except exceptions as e:
if retries == max_retries:
if logger:
logger.exception(f"❌ Max retries reached for '{func.__name__}': {e}")
raise
if logger:
logger.warning(
f"🔄 Retry {retries + 1}/{max_retries} for '{func.__name__}' after {current_delay}s due to '{e}'."
)
await asyncio.sleep(current_delay)
retries += 1
current_delay *= backoff
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
retries, current_delay = 0, delay
while retries <= max_retries:
if logger:
logger.debug(f"Retrying {retries + 1}/{max_retries} for '{func.__name__}' after {current_delay}s due to '{exceptions}'.")
try:
return func(*args, **kwargs)
except exceptions as e:
if retries == max_retries:
if logger:
logger.exception(f"❌ Max retries reached for '{func.__name__}': {e}")
raise
if logger:
logger.warning(
f"🔄 Retry {retries + 1}/{max_retries} for '{func.__name__}' after {current_delay}s due to '{e}'."
)
time.sleep(current_delay)
retries += 1
current_delay *= backoff
return async_wrapper if is_coroutine else sync_wrapper
return decorator
@retry(max_retries=10, delay=1, spinner_text="Trying risky thing...")
def might_fail():
time.sleep(4)
if random.random() < 0.6:
raise ValueError("Simulated failure")
return "🎉 Success!"
result = might_fail()
print(result)

498
menu/colors.py Normal file
View File

@ -0,0 +1,498 @@
"""
colors.py
A Python module that integrates the Nord color palette with the Rich library.
It defines a metaclass-based NordColors class allowing dynamic attribute lookups
(e.g., NORD12bu -> "#D08770 bold underline") and provides a comprehensive Nord-based
Theme that customizes Rich's default styles.
Features:
- All core Nord colors (NORD0 through NORD15), plus named aliases (Polar Night,
Snow Storm, Frost, Aurora).
- A dynamic metaclass (NordMeta) that enables usage of 'NORD1b', 'NORD1_biudrs', etc.
to return color + bold/italic/underline/dim/reverse/strike flags for Rich.
- A ready-to-use Theme (get_nord_theme) mapping Rich's default styles to Nord colors.
Example dynamic usage:
console.print("Hello!", style=NordColors.NORD12bu)
# => Renders "Hello!" in #D08770 (Nord12) plus bold and underline styles
"""
import re
from difflib import get_close_matches
from rich.style import Style
from rich.theme import Theme
from rich.console import Console
class ColorsMeta(type):
"""
A metaclass that catches attribute lookups like `NORD12buidrs` or `ORANGE_b` and returns
a string combining the base color + bold/italic/underline/dim/reverse/strike flags.
"""
_STYLE_MAP = {
"b": "bold",
"i": "italic",
"u": "underline",
"d": "dim",
"r": "reverse",
"s": "strike",
}
_cache: dict = {}
def __getattr__(cls, name: str) -> str:
"""
Intercepts attributes like 'NORD12b' or 'POLAR_NIGHT_BRIGHT_biu'.
Splits into a valid base color attribute (e.g. 'POLAR_NIGHT_BRIGHT') and suffix
characters 'b', 'i', 'u', 'd', 'r', 's' which map to 'bold', 'italic', 'underline',
'dim', 'reverse', 'strike'.
Returns a string Rich can parse: e.g. '#3B4252 bold italic underline'.
Raises an informative AttributeError if invalid base or style flags are used.
"""
if name in cls._cache:
return cls._cache[name]
match = re.match(r"([A-Z]+(?:_[A-Z]+)*[0-9]*)(?:_)?([biudrs]*)", name)
if not match:
raise AttributeError(
f"'{cls.__name__}' has no attribute '{name}'.\n"
f"Expected format: BASE[_]?FLAGS, where BASE is uppercase letters/underscores/digits, "
f"and FLAGS ∈ {{'b', 'i', 'u', 'd', 'r', 's'}}."
)
base, suffix = match.groups()
try:
color_value = type.__getattribute__(cls, base)
except AttributeError:
error_msg = [f"'{cls.__name__}' has no color named '{base}'."]
valid_bases = [
key for key, val in cls.__dict__.items() if isinstance(val, str) and
not key.startswith("__")
]
suggestions = get_close_matches(base, valid_bases, n=1, cutoff=0.5)
if suggestions:
error_msg.append(f"Did you mean '{suggestions[0]}'?")
if valid_bases:
error_msg.append("Valid base color names include: " + ", ".join(valid_bases))
raise AttributeError(" ".join(error_msg)) from None
if not isinstance(color_value, str):
raise AttributeError(
f"'{cls.__name__}.{base}' is not a string color.\n"
f"Make sure that attribute actually contains a color string."
)
unique_flags = set(suffix)
styles = []
for letter in unique_flags:
mapped_style = cls._STYLE_MAP.get(letter)
if mapped_style:
styles.append(mapped_style)
else:
raise AttributeError(f"Unknown style flag '{letter}' in attribute '{name}'")
order = {"b": 1, "i": 2, "u": 3, "d": 4, "r": 5, "s": 6}
styles_sorted = sorted(styles, key=lambda s: order[s[0]])
if styles_sorted:
style_string = f"{color_value} {' '.join(styles_sorted)}"
else:
style_string = color_value
cls._cache[name] = style_string
return style_string
class OneColors(metaclass=ColorsMeta):
BLACK = "#282C34"
GUTTER_GREY = "#4B5263"
COMMENT_GREY = "#5C6370"
WHITE = "#ABB2BF"
DARK_RED = "#BE5046"
LIGHT_RED = "#E06C75"
DARK_YELLOW = "#D19A66"
LIGHT_YELLOW = "#E5C07B"
GREEN = "#98C379"
CYAN = "#56B6C2"
BLUE = "#61AFEF"
MAGENTA = "#C678DD"
@classmethod
def as_dict(cls):
"""
Returns a dictionary mapping every NORD* attribute
(e.g. 'NORD0') to its hex code.
"""
return {
attr: getattr(cls, attr)
for attr in dir(cls)
if not callable(getattr(cls, attr)) and
not attr.startswith("__")
}
class NordColors(metaclass=ColorsMeta):
"""
Defines the Nord color palette as class attributes.
Each color is labeled by its canonical Nord name (NORD0-NORD15)
and also has useful aliases grouped by theme:
- Polar Night
- Snow Storm
- Frost
- Aurora
"""
# Polar Night
NORD0 = "#2E3440"
NORD1 = "#3B4252"
NORD2 = "#434C5E"
NORD3 = "#4C566A"
# Snow Storm
NORD4 = "#D8DEE9"
NORD5 = "#E5E9F0"
NORD6 = "#ECEFF4"
# Frost
NORD7 = "#8FBCBB"
NORD8 = "#88C0D0"
NORD9 = "#81A1C1"
NORD10 = "#5E81AC"
# Aurora
NORD11 = "#BF616A"
NORD12 = "#D08770"
NORD13 = "#EBCB8B"
NORD14 = "#A3BE8C"
NORD15 = "#B48EAD"
POLAR_NIGHT_ORIGIN = NORD0
POLAR_NIGHT_BRIGHT = NORD1
POLAR_NIGHT_BRIGHTER = NORD2
POLAR_NIGHT_BRIGHTEST = NORD3
SNOW_STORM_BRIGHT = NORD4
SNOW_STORM_BRIGHTER = NORD5
SNOW_STORM_BRIGHTEST = NORD6
FROST_TEAL = NORD7
FROST_ICE = NORD8
FROST_SKY = NORD9
FROST_DEEP = NORD10
RED = NORD11
ORANGE = NORD12
YELLOW = NORD13
GREEN = NORD14
PURPLE = NORD15
MAGENTA = NORD15
BLUE = NORD10
CYAN = NORD8
@classmethod
def as_dict(cls):
"""
Returns a dictionary mapping every NORD* attribute
(e.g. 'NORD0') to its hex code.
"""
return {
attr: getattr(cls, attr)
for attr in dir(cls)
if attr.startswith("NORD") and
not callable(getattr(cls, attr))
}
@classmethod
def aliases(cls):
"""
Returns a dictionary of *all* other aliases
(Polar Night, Snow Storm, Frost, Aurora).
"""
skip_prefixes = ("NORD", "__")
alias_names = [
attr for attr in dir(cls)
if not any(attr.startswith(sp) for sp in skip_prefixes)
and not callable(getattr(cls, attr))
]
return {name: getattr(cls, name) for name in alias_names}
NORD_THEME_STYLES: dict[str, Style] = {
# ---------------------------------------------------------------
# Base / Structural styles
# ---------------------------------------------------------------
"none": Style.null(),
"reset": Style(
color="default",
bgcolor="default",
dim=False,
bold=False,
italic=False,
underline=False,
blink=False,
blink2=False,
reverse=False,
conceal=False,
strike=False,
),
"dim": Style(dim=True),
"bright": Style(dim=False),
"bold": Style(bold=True),
"strong": Style(bold=True),
"code": Style(reverse=True, bold=True),
"italic": Style(italic=True),
"emphasize": Style(italic=True),
"underline": Style(underline=True),
"blink": Style(blink=True),
"blink2": Style(blink2=True),
"reverse": Style(reverse=True),
"strike": Style(strike=True),
# ---------------------------------------------------------------
# Basic color names mapped to Nord
# ---------------------------------------------------------------
"black": Style(color=NordColors.POLAR_NIGHT_ORIGIN),
"red": Style(color=NordColors.RED),
"green": Style(color=NordColors.GREEN),
"yellow": Style(color=NordColors.YELLOW),
"magenta": Style(color=NordColors.MAGENTA),
"purple": Style(color=NordColors.PURPLE),
"cyan": Style(color=NordColors.CYAN),
"blue": Style(color=NordColors.BLUE),
"white": Style(color=NordColors.SNOW_STORM_BRIGHTEST),
# ---------------------------------------------------------------
# Inspect
# ---------------------------------------------------------------
"inspect.attr": Style(color=NordColors.YELLOW, italic=True),
"inspect.attr.dunder": Style(color=NordColors.YELLOW, italic=True, dim=True),
"inspect.callable": Style(bold=True, color=NordColors.RED),
"inspect.async_def": Style(italic=True, color=NordColors.FROST_ICE),
"inspect.def": Style(italic=True, color=NordColors.FROST_ICE),
"inspect.class": Style(italic=True, color=NordColors.FROST_ICE),
"inspect.error": Style(bold=True, color=NordColors.RED),
"inspect.equals": Style(),
"inspect.help": Style(color=NordColors.FROST_ICE),
"inspect.doc": Style(dim=True),
"inspect.value.border": Style(color=NordColors.GREEN),
# ---------------------------------------------------------------
# Live / Layout
# ---------------------------------------------------------------
"live.ellipsis": Style(bold=True, color=NordColors.RED),
"layout.tree.row": Style(dim=False, color=NordColors.RED),
"layout.tree.column": Style(dim=False, color=NordColors.FROST_DEEP),
# ---------------------------------------------------------------
# Logging
# ---------------------------------------------------------------
"logging.keyword": Style(bold=True, color=NordColors.YELLOW),
"logging.level.notset": Style(dim=True),
"logging.level.debug": Style(color=NordColors.GREEN),
"logging.level.info": Style(color=NordColors.FROST_ICE),
"logging.level.warning": Style(color=NordColors.RED),
"logging.level.error": Style(color=NordColors.RED, bold=True),
"logging.level.critical": Style(color=NordColors.RED, bold=True, reverse=True),
"log.level": Style.null(),
"log.time": Style(color=NordColors.FROST_ICE, dim=True),
"log.message": Style.null(),
"log.path": Style(dim=True),
# ---------------------------------------------------------------
# Python repr
# ---------------------------------------------------------------
"repr.ellipsis": Style(color=NordColors.YELLOW),
"repr.indent": Style(color=NordColors.GREEN, dim=True),
"repr.error": Style(color=NordColors.RED, bold=True),
"repr.str": Style(color=NordColors.GREEN, italic=False, bold=False),
"repr.brace": Style(bold=True),
"repr.comma": Style(bold=True),
"repr.ipv4": Style(bold=True, color=NordColors.GREEN),
"repr.ipv6": Style(bold=True, color=NordColors.GREEN),
"repr.eui48": Style(bold=True, color=NordColors.GREEN),
"repr.eui64": Style(bold=True, color=NordColors.GREEN),
"repr.tag_start": Style(bold=True),
"repr.tag_name": Style(color=NordColors.PURPLE, bold=True),
"repr.tag_contents": Style(color="default"),
"repr.tag_end": Style(bold=True),
"repr.attrib_name": Style(color=NordColors.YELLOW, italic=False),
"repr.attrib_equal": Style(bold=True),
"repr.attrib_value": Style(color=NordColors.PURPLE, italic=False),
"repr.number": Style(color=NordColors.FROST_ICE, bold=True, italic=False),
"repr.number_complex": Style(color=NordColors.FROST_ICE, bold=True, italic=False),
"repr.bool_true": Style(color=NordColors.GREEN, italic=True),
"repr.bool_false": Style(color=NordColors.RED, italic=True),
"repr.none": Style(color=NordColors.PURPLE, italic=True),
"repr.url": Style(underline=True, color=NordColors.FROST_ICE, italic=False, bold=False),
"repr.uuid": Style(color=NordColors.YELLOW, bold=False),
"repr.call": Style(color=NordColors.PURPLE, bold=True),
"repr.path": Style(color=NordColors.PURPLE),
"repr.filename": Style(color=NordColors.PURPLE),
# ---------------------------------------------------------------
# Rule
# ---------------------------------------------------------------
"rule.line": Style(color=NordColors.GREEN),
"rule.text": Style.null(),
# ---------------------------------------------------------------
# JSON
# ---------------------------------------------------------------
"json.brace": Style(bold=True),
"json.bool_true": Style(color=NordColors.GREEN, italic=True),
"json.bool_false": Style(color=NordColors.RED, italic=True),
"json.null": Style(color=NordColors.PURPLE, italic=True),
"json.number": Style(color=NordColors.FROST_ICE, bold=True, italic=False),
"json.str": Style(color=NordColors.GREEN, italic=False, bold=False),
"json.key": Style(color=NordColors.FROST_ICE, bold=True),
# ---------------------------------------------------------------
# Prompt
# ---------------------------------------------------------------
"prompt": Style.null(),
"prompt.choices": Style(color=NordColors.PURPLE, bold=True),
"prompt.default": Style(color=NordColors.FROST_ICE, bold=True),
"prompt.invalid": Style(color=NordColors.RED),
"prompt.invalid.choice": Style(color=NordColors.RED),
# ---------------------------------------------------------------
# Pretty
# ---------------------------------------------------------------
"pretty": Style.null(),
# ---------------------------------------------------------------
# Scope
# ---------------------------------------------------------------
"scope.border": Style(color=NordColors.FROST_ICE),
"scope.key": Style(color=NordColors.YELLOW, italic=True),
"scope.key.special": Style(color=NordColors.YELLOW, italic=True, dim=True),
"scope.equals": Style(color=NordColors.RED),
# ---------------------------------------------------------------
# Table
# ---------------------------------------------------------------
"table.header": Style(bold=True),
"table.footer": Style(bold=True),
"table.cell": Style.null(),
"table.title": Style(italic=True),
"table.caption": Style(italic=True, dim=True),
# ---------------------------------------------------------------
# Traceback
# ---------------------------------------------------------------
"traceback.error": Style(color=NordColors.RED, italic=True),
"traceback.border.syntax_error": Style(color=NordColors.RED),
"traceback.border": Style(color=NordColors.RED),
"traceback.text": Style.null(),
"traceback.title": Style(color=NordColors.RED, bold=True),
"traceback.exc_type": Style(color=NordColors.RED, bold=True),
"traceback.exc_value": Style.null(),
"traceback.offset": Style(color=NordColors.RED, bold=True),
# ---------------------------------------------------------------
# Progress bars
# ---------------------------------------------------------------
"bar.back": Style(color=NordColors.POLAR_NIGHT_BRIGHTEST),
"bar.complete": Style(color=NordColors.RED),
"bar.finished": Style(color=NordColors.GREEN),
"bar.pulse": Style(color=NordColors.RED),
"progress.description": Style.null(),
"progress.filesize": Style(color=NordColors.GREEN),
"progress.filesize.total": Style(color=NordColors.GREEN),
"progress.download": Style(color=NordColors.GREEN),
"progress.elapsed": Style(color=NordColors.YELLOW),
"progress.percentage": Style(color=NordColors.PURPLE),
"progress.remaining": Style(color=NordColors.FROST_ICE),
"progress.data.speed": Style(color=NordColors.RED),
"progress.spinner": Style(color=NordColors.GREEN),
"status.spinner": Style(color=NordColors.GREEN),
# ---------------------------------------------------------------
# Tree
# ---------------------------------------------------------------
"tree": Style(),
"tree.line": Style(),
# ---------------------------------------------------------------
# Markdown
# ---------------------------------------------------------------
"markdown.paragraph": Style(),
"markdown.text": Style(),
"markdown.em": Style(italic=True),
"markdown.emph": Style(italic=True), # For commonmark compatibility
"markdown.strong": Style(bold=True),
"markdown.code": Style(bold=True, color=NordColors.FROST_ICE, bgcolor=NordColors.POLAR_NIGHT_ORIGIN),
"markdown.code_block": Style(color=NordColors.FROST_ICE, bgcolor=NordColors.POLAR_NIGHT_ORIGIN),
"markdown.block_quote": Style(color=NordColors.PURPLE),
"markdown.list": Style(color=NordColors.FROST_ICE),
"markdown.item": Style(),
"markdown.item.bullet": Style(color=NordColors.YELLOW, bold=True),
"markdown.item.number": Style(color=NordColors.YELLOW, bold=True),
"markdown.hr": Style(color=NordColors.YELLOW),
"markdown.h1.border": Style(),
"markdown.h1": Style(bold=True),
"markdown.h2": Style(bold=True, underline=True),
"markdown.h3": Style(bold=True),
"markdown.h4": Style(bold=True, dim=True),
"markdown.h5": Style(underline=True),
"markdown.h6": Style(italic=True),
"markdown.h7": Style(italic=True, dim=True),
"markdown.link": Style(color=NordColors.FROST_ICE),
"markdown.link_url": Style(color=NordColors.FROST_SKY, underline=True),
"markdown.s": Style(strike=True),
# ---------------------------------------------------------------
# ISO8601
# ---------------------------------------------------------------
"iso8601.date": Style(color=NordColors.FROST_ICE),
"iso8601.time": Style(color=NordColors.PURPLE),
"iso8601.timezone": Style(color=NordColors.YELLOW),
}
def get_nord_theme() -> Theme:
"""
Returns a Rich Theme for the Nord color palette.
"""
return Theme(NORD_THEME_STYLES)
if __name__ == "__main__":
console = Console(theme=get_nord_theme(), color_system="truecolor")
# Basic demonstration of the Nord theme
console.print("Welcome to the [bold underline]Nord Themed[/] console!\n")
console.print("1) This is default text (no style).")
console.print("2) This is [red]red[/].")
console.print("3) This is [green]green[/].")
console.print("4) This is [blue]blue[/] (maps to Frost).")
console.print("5) And here's some [bold]Bold text[/] and [italic]italic text[/].\n")
console.log("Log example in info mode.")
console.log("Another log, with a custom style", style="logging.level.warning")
# Demonstrate the dynamic attribute usage
console.print(
"6) Demonstrating dynamic attribute [NORD3bu]: This text should be bold, underlined, "
"and use Nord3's color (#4C566A).",
style=NordColors.NORD3bu,
)
console.print()
# Show how the custom attribute can fail gracefully
try:
console.print("7) Attempting invalid suffix [NORD3z]:", style=NordColors.NORD3z)
except AttributeError as error:
console.print(f"Caught error: {error}", style="red")
# Demonstrate a traceback style:
console.print("\n8) Raising and displaying a traceback with Nord styling:\n", style="bold")
try:
raise ValueError("Nord test exception!")
except ValueError:
console.print_exception(show_locals=True)
console.print("\nEnd of Nord theme demo!", style="bold")

66
menu/hook_manager.py Normal file
View File

@ -0,0 +1,66 @@
"""hook_manager.py"""
from __future__ import annotations
import inspect
import logging
from typing import (Any, Awaitable, Callable, Dict, List, Optional, TypedDict,
Union, TYPE_CHECKING)
if TYPE_CHECKING:
from action import BaseAction
from option import Option
logger = logging.getLogger("menu")
class HookContext(TypedDict, total=False):
name: str
args: tuple[Any, ...]
kwargs: dict[str, Any]
result: Any | None
exception: Exception | None
option: Option | None
action: BaseAction | None
Hook = Union[Callable[[HookContext], None], Callable[[HookContext], Awaitable[None]]]
class HookManager:
def __init__(self):
self._hooks: Dict[str, List[Hook]] = {
"before": [],
"after": [],
"on_error": [],
"on_teardown": [],
}
def register(self, hook_type: str, hook: Hook):
if hook_type not in self._hooks:
raise ValueError(f"Unsupported hook type: {hook_type}")
self._hooks[hook_type].append(hook)
def clear(self, hook_type: Optional[str] = None):
if hook_type:
self._hooks[hook_type] = []
else:
for k in self._hooks:
self._hooks[k] = []
async def trigger(self, hook_type: str, context: HookContext):
if hook_type not in self._hooks:
raise ValueError(f"Unsupported hook type: {hook_type}")
for hook in self._hooks[hook_type]:
try:
if inspect.iscoroutinefunction(hook):
await hook(context)
else:
hook(context)
except Exception as hook_error:
name = context.get("name", "<unnamed>")
logger.warning(f"⚠️ Hook '{hook.__name__}' raised an exception during '{hook_type}'"
f" for '{name}': {hook_error}")
if hook_type == "on_error":
raise context.get("exception") from hook_error

183
menu/hooks.py Normal file
View File

@ -0,0 +1,183 @@
import functools
import logging
import random
import time
from hook_manager import HookContext
from menu_utils import run_async
logger = logging.getLogger("menu")
def log_before(context: dict) -> None:
name = context.get("name", "<unnamed>")
option = context.get("option")
if option:
logger.info(f"🚀 Starting action '{option.description}' (key='{option.key}')")
else:
logger.info(f"🚀 Starting action '{name}'")
def log_after(context: dict) -> None:
name = context.get("name", "<unnamed>")
duration = context.get("duration")
if duration is not None:
logger.info(f"✅ Completed '{name}' in {duration:.2f}s")
else:
logger.info(f"✅ Completed '{name}'")
def log_error(context: dict) -> None:
name = context.get("name", "<unnamed>")
error = context.get("exception")
duration = context.get("duration")
if duration is not None:
logger.error(f"❌ Error '{name}' after {duration:.2f}s: {error}")
else:
logger.error(f"❌ Error '{name}': {error}")
class CircuitBreakerOpen(Exception):
"""Exception raised when the circuit breaker is open."""
class CircuitBreaker:
def __init__(self, max_failures=3, reset_timeout=10):
self.max_failures = max_failures
self.reset_timeout = reset_timeout
self.failures = 0
self.open_until = None
def before_hook(self, context: HookContext):
name = context.get("name", "<unnamed>")
if self.open_until:
if time.time() < self.open_until:
raise CircuitBreakerOpen(f"🔴 Circuit open for '{name}' until {time.ctime(self.open_until)}.")
else:
logger.info(f"🟢 Circuit closed again for '{name}'.")
self.failures = 0
self.open_until = None
def error_hook(self, context: HookContext):
name = context.get("name", "<unnamed>")
self.failures += 1
logger.warning(f"⚠️ CircuitBreaker: '{name}' failure {self.failures}/{self.max_failures}.")
if self.failures >= self.max_failures:
self.open_until = time.time() + self.reset_timeout
logger.error(f"🔴 Circuit opened for '{name}' until {time.ctime(self.open_until)}.")
def after_hook(self, context: HookContext):
self.failures = 0
def is_open(self):
return self.open_until is not None and time.time() < self.open_until
def reset(self):
self.failures = 0
self.open_until = None
logger.info("🔄 Circuit reset.")
class RetryHandler:
def __init__(self, max_retries=5, delay=1, backoff=2):
self.max_retries = max_retries
self.delay = delay
self.backoff = backoff
def retry_on_error(self, context: HookContext):
name = context.get("name", "<unnamed>")
error = context.get("exception")
option = context.get("option")
action = context.get("action")
retries_done = 0
current_delay = self.delay
last_error = error
if not (option or action):
logger.warning(f"⚠️ RetryHandler: No Option or Action in context for '{name}'. Skipping retry.")
return
target = option or action
while retries_done < self.max_retries:
try:
retries_done += 1
logger.info(f"🔄 Retrying '{name}' ({retries_done}/{self.max_retries}) in {current_delay}s due to '{last_error}'...")
time.sleep(current_delay)
result = target(*context.get("args", ()), **context.get("kwargs", {}))
if option:
option.set_result(result)
context["result"] = result
context["duration"] = target.get_duration() or 0.0
context.pop("exception", None)
logger.info(f"✅ Retry succeeded for '{name}' on attempt {retries_done}.")
if hasattr(target, "hooks"):
run_async(target.hooks.trigger("after", context))
return
except Exception as retry_error:
logger.warning(f"⚠️ Retry attempt {retries_done} for '{name}' failed due to '{retry_error}'.")
last_error = retry_error
current_delay *= self.backoff
logger.exception(f"'{name}' failed after {self.max_retries} retries.")
raise last_error
def retry(max_retries=3, delay=1, backoff=2, exceptions=(Exception,), logger=None):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
retries, current_delay = 0, delay
while retries <= max_retries:
try:
return func(*args, **kwargs)
except exceptions as e:
if retries == max_retries:
if logger:
logger.exception(f"❌ Max retries reached for '{func.__name__}': {e}")
raise
if logger:
logger.warning(
f"🔄 Retry {retries + 1}/{max_retries} for '{func.__name__}' after {current_delay}s due to '{e}'."
)
time.sleep(current_delay)
retries += 1
current_delay *= backoff
return wrapper
return decorator
def setup_hooks(menu):
menu.add_before(log_before)
menu.add_after(log_after)
menu.add_on_error(log_error)
if __name__ == "__main__":
from menu import Menu
def risky_task():
if random.random() > 0.1:
time.sleep(1)
raise ValueError("Random failure occurred")
print("Task succeeded!")
breaker = CircuitBreaker(max_failures=2, reset_timeout=10)
retry_handler = RetryHandler(max_retries=30, delay=2, backoff=2)
menu = Menu(never_confirm=True)
menu.add_before(log_before)
menu.add_after(log_after)
menu.add_on_error(log_error)
menu.add_option(
key="CR",
description="Retry with CircuitBreaker",
action=risky_task,
before_hooks=[breaker.before_hook],
after_hooks=[breaker.after_hook],
error_hooks=[retry_handler.retry_on_error, breaker.error_hook],
)
menu.run()

40
menu/logging_utils.py Normal file
View File

@ -0,0 +1,40 @@
import logging
from rich.logging import RichHandler
def setup_logging(
log_filename: str = "menu.log",
console_log_level: int = logging.DEBUG,
file_log_level: int = logging.DEBUG,
):
"""Set up logging configuration with separate console and file handlers."""
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
if root_logger.hasHandlers():
root_logger.handlers.clear()
console_handler = RichHandler(
rich_tracebacks=True,
show_time=True,
show_level=True,
show_path=False,
markup=True,
log_time_format="[%Y-%m-%d %H:%M:%S]",
)
console_handler.setLevel(console_log_level)
file_handler = logging.FileHandler(log_filename)
file_handler.setLevel(file_log_level)
file_formatter = logging.Formatter(
"%(asctime)s [%(name)s] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
file_handler.setFormatter(file_formatter)
root_logger.addHandler(console_handler)
root_logger.addHandler(file_handler)
menu_logger = logging.getLogger("menu")
menu_logger.setLevel(console_log_level)
menu_logger.propagate = True

29
menu/main.py Normal file
View File

@ -0,0 +1,29 @@
import logging
from rich.traceback import install
from logging_utils import setup_logging
from menu import Menu
from hooks import setup_hooks, CircuitBreaker, RetryHandler
from task import risky_task
install(show_locals=True, width=120)
setup_logging()
logger = logging.getLogger("menu")
menu = Menu(title="Main Menu", never_confirm=True)
setup_hooks(menu)
breaker = CircuitBreaker(max_failures=2, reset_timeout=10)
retry_handler = RetryHandler(max_retries=30, delay=2, backoff=2)
menu.add_option(
"1",
"Run Risky Task",
risky_task,
before_hooks=[breaker.before_hook],
after_hooks=[breaker.after_hook],
error_hooks=[retry_handler.retry_on_error, breaker.error_hook],
)
if __name__ == "__main__":
result = menu.run_headless("1")
logger.info(f"Headless execution returned: {result}")

432
menu/menu.py Normal file
View File

@ -0,0 +1,432 @@
"""menu.py
This class creates a Menu object that creates a selectable menu
with customizable options and functionality.
It allows for adding options, and their accompanying actions,
and provides a method to display the menu and handle user input.
This class uses the `rich` library to display the menu in a
formatted and visually appealing way.
This class also uses the `prompt_toolkit` library to handle
user input and create an interactive experience.
"""
import asyncio
import logging
from functools import cached_property
from typing import Any, Callable
from prompt_toolkit import PromptSession
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.formatted_text import AnyFormattedText
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.shortcuts import confirm
from prompt_toolkit.validation import Validator
from rich import box
from rich.console import Console
from rich.markdown import Markdown
from rich.table import Table
from action import BaseAction
from bottom_bar import BottomBar
from colors import get_nord_theme
from hook_manager import HookManager
from menu_utils import (CaseInsensitiveDict, InvalidActionError, MenuError,
NotAMenuError, OptionAlreadyExistsError, chunks, run_async)
from one_colors import OneColors
from option import Option
logger = logging.getLogger("menu")
class Menu:
"""Class to create a menu with options.
Hook functions must have the signature:
def hook(option: Option) -> None:
where `option` is the selected option.
Error hook functions must have the signature:
def error_hook(option: Option, error: Exception) -> None:
where `option` is the selected option and `error` is the exception raised.
Hook execution order:
1. Before action hooks of the menu.
2. Before action hooks of the selected option.
3. Action of the selected option.
4. After action hooks of the selected option.
5. After action hooks of the menu.
6. On error hooks of the selected option (if an error occurs).
7. On error hooks of the menu (if an error occurs).
Parameters:
title (str|Markdown): The title of the menu.
columns (int): The number of columns to display the options in.
prompt (AnyFormattedText): The prompt to display when asking for input.
bottom_bar (str|callable|None): The text to display in the bottom bar.
"""
def __init__(
self,
title: str | Markdown = "Menu",
prompt: str | AnyFormattedText = "> ",
columns: int = 3,
bottom_bar: str | Callable[[], None] | None = None,
welcome_message: str | Markdown = "",
exit_message: str | Markdown = "",
run_hooks_on_back_option: bool = False,
continue_on_error_prompt: bool = True,
never_confirm: bool = False,
_verbose: bool = False,
) -> None:
"""Initializes the Menu object."""
self.title: str | Markdown = title
self.prompt: str | AnyFormattedText = prompt
self.columns: int = columns
self.bottom_bar: str | Callable[[], None] | None = bottom_bar or BottomBar(columns=columns)
self.options: dict[str, Option] = CaseInsensitiveDict()
self.back_option: Option = self._get_back_option()
self.console: Console = Console(color_system="truecolor", theme=get_nord_theme())
#self.session: PromptSession = self._get_prompt_session()
self.welcome_message: str | Markdown = welcome_message
self.exit_message: str | Markdown = exit_message
self.hooks: HookManager = HookManager()
self.run_hooks_on_back_option: bool = run_hooks_on_back_option
self.continue_on_error_prompt: bool = continue_on_error_prompt
self._never_confirm: bool = never_confirm
self._verbose: bool = _verbose
self.last_run_option: Option | None = None
self.key_bindings: KeyBindings = KeyBindings()
self.toggles: dict[str, str] = {}
def get_title(self) -> str:
"""Returns the string title of the menu."""
if isinstance(self.title, str):
return self.title
elif isinstance(self.title, Markdown):
return self.title.markup
return self.title
def _get_back_option(self) -> Option:
"""Returns the back option for the menu."""
return Option(key="0", description="Back", color=OneColors.DARK_RED)
def _get_completer(self) -> WordCompleter:
"""Completer to provide auto-completion for the menu options."""
return WordCompleter([*self.options.keys(), self.back_option.key], ignore_case=True)
def _get_validator(self) -> Validator:
"""Validator to check if the input is a valid option."""
valid_keys = {key.upper() for key in self.options.keys()} | {self.back_option.key.upper()}
valid_keys_str = ", ".join(sorted(valid_keys))
return Validator.from_callable(
lambda text: text.upper() in valid_keys,
error_message=f"Invalid option. Valid options are: {valid_keys_str}",
move_cursor_to_end=True,
)
def _invalidate_table_cache(self):
"""Forces the table to be recreated on the next access."""
if hasattr(self, "table"):
del self.table
def _refresh_session(self):
"""Refreshes the prompt session to apply any changes."""
self.session.completer = self._get_completer()
self.session.validator = self._get_validator()
self._invalidate_table_cache()
@cached_property
def session(self) -> PromptSession:
"""Returns the prompt session for the menu."""
return PromptSession(
message=self.prompt,
multiline=False,
completer=self._get_completer(),
reserve_space_for_menu=1,
validator=self._get_validator(),
bottom_toolbar=self.bottom_bar.render,
)
def add_toggle(self, key: str, label: str, state: bool = False):
if key in self.options or key in self.toggles:
raise ValueError(f"Key '{key}' is already in use.")
self.toggles[key] = label
self.bottom_bar.add_toggle(label, label, state)
@self.key_bindings.add(key)
def _(event):
current = self.bottom_bar._states[label][1]
self.bottom_bar.update_toggle(label, not current)
self.console.print(f"Toggled [{label}] to {'ON' if not current else 'OFF'}")
def add_counter(self, name: str, label: str, current: int, total: int):
self.bottom_bar.add_counter(name, label, current, total)
def update_counter(self, name: str, current: int | None = None, total: int | None = None):
self.bottom_bar.update_counter(name, current=current, total=total)
def update_toggle(self, name: str, state: bool):
self.bottom_bar.update_toggle(name, state)
def debug_hooks(self) -> None:
if not self._verbose:
return
def hook_names(hook_list):
return [hook.__name__ for hook in hook_list]
logger.debug(f"Menu-level before hooks: {hook_names(self.hooks._hooks['before'])}")
logger.debug(f"Menu-level after hooks: {hook_names(self.hooks._hooks['after'])}")
logger.debug(f"Menu-level error hooks: {hook_names(self.hooks._hooks['on_error'])}")
for key, option in self.options.items():
logger.debug(f"[Option '{key}'] before: {hook_names(option.hooks._hooks['before'])}")
logger.debug(f"[Option '{key}'] after: {hook_names(option.hooks._hooks['after'])}")
logger.debug(f"[Option '{key}'] error: {hook_names(option.hooks._hooks['on_error'])}")
def _validate_option_key(self, key: str) -> None:
"""Validates the option key to ensure it is unique."""
if key.upper() in self.options or key.upper() == self.back_option.key.upper():
raise OptionAlreadyExistsError(f"Option with key '{key}' already exists.")
def update_back_option(
self,
key: str = "0",
description: str = "Back",
action: Callable[[], Any] = lambda: None,
color: str = OneColors.DARK_RED,
confirm: bool = False,
confirm_message: str = "Are you sure?",
) -> None:
"""Updates the back option of the menu."""
if not callable(action):
raise InvalidActionError("Action must be a callable.")
self._validate_option_key(key)
self.back_option = Option(
key=key,
description=description,
action=action,
color=color,
confirm=confirm,
confirm_message=confirm_message,
)
self._refresh_session()
def add_submenu(self, key: str, description: str, submenu: "Menu", color: str = OneColors.CYAN) -> None:
"""Adds a submenu to the menu."""
if not isinstance(submenu, Menu):
raise NotAMenuError("submenu must be an instance of Menu.")
self._validate_option_key(key)
self.add_option(key, description, submenu.run, color)
self._refresh_session()
def add_options(self, options: list[dict]) -> None:
"""Adds multiple options to the menu."""
for option in options:
self.add_option(**option)
def add_option(
self,
key: str,
description: str,
action: BaseAction | Callable[[], Any],
color: str = OneColors.WHITE,
confirm: bool = False,
confirm_message: str = "Are you sure?",
spinner: bool = False,
spinner_message: str = "Processing...",
spinner_type: str = "dots",
spinner_style: str = OneColors.CYAN,
spinner_kwargs: dict[str, Any] | None = None,
before_hooks: list[Callable] | None = None,
after_hooks: list[Callable] | None = None,
error_hooks: list[Callable] | None = None,
) -> Option:
"""Adds an option to the menu, preventing duplicates."""
spinner_kwargs: dict[str, Any] = spinner_kwargs or {}
self._validate_option_key(key)
option = Option(
key=key,
description=description,
action=action,
color=color,
confirm=confirm,
confirm_message=confirm_message,
spinner=spinner,
spinner_message=spinner_message,
spinner_type=spinner_type,
spinner_style=spinner_style,
spinner_kwargs=spinner_kwargs,
)
for hook in before_hooks or []:
option.hooks.register("before", hook)
for hook in after_hooks or []:
option.hooks.register("after", hook)
for hook in error_hooks or []:
option.hooks.register("on_error", hook)
self.options[key] = option
self._refresh_session()
return option
@cached_property
def table(self) -> Table:
"""Creates a rich table to display the menu options."""
table = Table(title=self.title, show_header=False, box=box.SIMPLE, expand=True)
for chunk in chunks(self.options.items(), self.columns):
row = []
for key, option in chunk:
row.append(f"[{key}] [{option.color}]{option.description}")
table.add_row(*row)
table.add_row(f"[{self.back_option.key}] [{self.back_option.color}]{self.back_option.description}")
return table
def get_option(self, choice: str) -> Option | None:
"""Returns the selected option based on user input."""
if choice.upper() == self.back_option.key.upper():
return self.back_option
return self.options.get(choice)
def _should_run_action(self, selected_option: Option) -> bool:
if selected_option.confirm and not self._never_confirm:
return confirm(selected_option.confirm_message)
return True
def _create_context(self, selected_option: Option) -> dict[str, Any]:
"""Creates a context dictionary for the selected option."""
return {
"name": selected_option.description,
"option": selected_option,
"args": (),
"kwargs": {},
}
def _run_action_with_spinner(self, option: Option) -> Any:
"""Runs the action of the selected option with a spinner."""
with self.console.status(
option.spinner_message,
spinner=option.spinner_type,
spinner_style=option.spinner_style,
**option.spinner_kwargs,
):
return option()
def _handle_action_error(self, selected_option: Option, error: Exception) -> bool:
"""Handles errors that occur during the action of the selected option."""
logger.exception(f"Error executing '{selected_option.description}': {error}")
self.console.print(f"[{OneColors.DARK_RED}]An error occurred while executing "
f"{selected_option.description}:[/] {error}")
if self.continue_on_error_prompt and not self._never_confirm:
return confirm("An error occurred. Do you wish to continue?")
if self._never_confirm:
return True
return False
def process_action(self) -> bool:
"""Processes the action of the selected option."""
choice = self.session.prompt()
selected_option = self.get_option(choice)
self.last_run_option = selected_option
if selected_option == self.back_option:
logger.info(f"🔙 Back selected: exiting {self.get_title()}")
return False
if not self._should_run_action(selected_option):
logger.info(f"[{OneColors.DARK_RED}] {selected_option.description} cancelled.")
return True
context = self._create_context(selected_option)
try:
run_async(self.hooks.trigger("before", context))
if selected_option.spinner:
result = self._run_action_with_spinner(selected_option)
else:
result = selected_option()
selected_option.set_result(result)
context["result"] = result
context["duration"] = selected_option.get_duration()
run_async(self.hooks.trigger("after", context))
except Exception as error:
context["exception"] = error
context["duration"] = selected_option.get_duration()
run_async(self.hooks.trigger("on_error", context))
if "exception" not in context:
logger.info(f"✅ Recovery hook handled error for '{selected_option.description}'")
return True
return self._handle_action_error(selected_option, error)
return True
def run_headless(self, option_key: str, never_confirm: bool | None = None) -> Any:
"""Runs the action of the selected option without displaying the menu."""
self.debug_hooks()
if never_confirm is not None:
self._never_confirm = never_confirm
selected_option = self.get_option(option_key)
self.last_run_option = selected_option
if not selected_option:
logger.info("[Headless] Back option selected. Exiting menu.")
return
logger.info(f"[Headless] 🚀 Running: '{selected_option.description}'")
if not self._should_run_action(selected_option):
raise MenuError(f"[Headless] '{selected_option.description}' cancelled by confirmation.")
context = self._create_context(selected_option)
try:
run_async(self.hooks.trigger("before", context))
if selected_option.spinner:
result = self._run_action_with_spinner(selected_option)
else:
result = selected_option()
selected_option.set_result(result)
context["result"] = result
context["duration"] = selected_option.get_duration()
run_async(self.hooks.trigger("after", context))
logger.info(f"[Headless] ✅ '{selected_option.description}' complete.")
except (KeyboardInterrupt, EOFError):
raise MenuError(f"[Headless] ⚠️ '{selected_option.description}' interrupted by user.")
except Exception as error:
context["exception"] = error
context["duration"] = selected_option.get_duration()
run_async(self.hooks.trigger("on_error", context))
if "exception" not in context:
logger.info(f"[Headless] ✅ Recovery hook handled error for '{selected_option.description}'")
return True
continue_running = self._handle_action_error(selected_option, error)
if not continue_running:
raise MenuError(f"[Headless] ❌ '{selected_option.description}' failed.") from error
return selected_option.get_result()
def run(self) -> None:
"""Runs the menu and handles user input."""
logger.info(f"Running menu: {self.get_title()}")
self.debug_hooks()
if self.welcome_message:
self.console.print(self.welcome_message)
while True:
self.console.print(self.table)
try:
if not self.process_action():
break
except (EOFError, KeyboardInterrupt):
logger.info(f"[{OneColors.DARK_RED}]EOF or KeyboardInterrupt. Exiting menu.")
break
logger.info(f"Exiting menu: {self.get_title()}")
if self.exit_message:
self.console.print(self.exit_message)

79
menu/menu_utils.py Normal file
View File

@ -0,0 +1,79 @@
import asyncio
import time
from itertools import islice
def chunks(iterator, size):
"""Yield successive n-sized chunks from an iterator."""
iterator = iter(iterator)
while True:
chunk = list(islice(iterator, size))
if not chunk:
break
yield chunk
def run_async(coro):
"""Run an async function in a synchronous context."""
try:
_ = asyncio.get_running_loop()
return asyncio.create_task(coro)
except RuntimeError:
return asyncio.run(coro)
class TimingMixin:
def _start_timer(self):
self.start_time = time.perf_counter()
def _stop_timer(self):
self.end_time = time.perf_counter()
self._duration = self.end_time - self.start_time
def get_duration(self) -> float | None:
return getattr(self, "_duration", None)
class MenuError(Exception):
"""Custom exception for the Menu class."""
class OptionAlreadyExistsError(MenuError):
"""Exception raised when an option with the same key already exists in the menu."""
class InvalidHookError(MenuError):
"""Exception raised when a hook is not callable."""
class InvalidActionError(MenuError):
"""Exception raised when an action is not callable."""
class NotAMenuError(MenuError):
"""Exception raised when the provided submenu is not an instance of Menu."""
class CaseInsensitiveDict(dict):
"""A case-insensitive dictionary that treats all keys as uppercase."""
def __setitem__(self, key, value):
super().__setitem__(key.upper(), value)
def __getitem__(self, key):
return super().__getitem__(key.upper())
def __contains__(self, key):
return super().__contains__(key.upper())
def get(self, key, default=None):
return super().get(key.upper(), default)
def pop(self, key, default=None):
return super().pop(key.upper(), default)
def update(self, other=None, **kwargs):
if other:
other = {k.upper(): v for k, v in other.items()}
kwargs = {k.upper(): v for k, v in kwargs.items()}
super().update(other, **kwargs)

28
menu/one_colors.py Normal file
View File

@ -0,0 +1,28 @@
from colors import ColorsMeta
class OneColors(metaclass=ColorsMeta):
BLACK = "#282C34"
GUTTER_GREY = "#4B5263"
COMMENT_GREY = "#5C6370"
WHITE = "#ABB2BF"
DARK_RED = "#BE5046"
LIGHT_RED = "#E06C75"
DARK_YELLOW = "#D19A66"
LIGHT_YELLOW = "#E5C07B"
GREEN = "#98C379"
CYAN = "#56B6C2"
BLUE = "#61AFEF"
MAGENTA = "#C678DD"
@classmethod
def as_dict(cls):
"""
Returns a dictionary mapping every NORD* attribute
(e.g. 'NORD0') to its hex code.
"""
return {
attr: getattr(cls, attr)
for attr in dir(cls)
if not callable(getattr(cls, attr)) and
not attr.startswith("__")
}

110
menu/option.py Normal file
View File

@ -0,0 +1,110 @@
"""option.py
Any Action or Option is callable and supports the signature:
result = thing(*args, **kwargs)
This guarantees:
- Hook lifecycle (before/after/error/teardown)
- Timing
- Consistent return values
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Callable
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr
from action import BaseAction
from colors import OneColors
from hook_manager import HookManager
from menu_utils import TimingMixin, run_async
logger = logging.getLogger("menu")
class Option(BaseModel, TimingMixin):
"""Class representing an option in the menu.
Hooks must have the signature:
def hook(option: Option) -> None:
where `option` is the selected option.
Error hooks must have the signature:
def error_hook(option: Option, error: Exception) -> None:
where `option` is the selected option and `error` is the exception raised.
"""
key: str
description: str
action: BaseAction | Callable[[], Any] = lambda: None
color: str = OneColors.WHITE
confirm: bool = False
confirm_message: str = "Are you sure?"
spinner: bool = False
spinner_message: str = "Processing..."
spinner_type: str = "dots"
spinner_style: str = OneColors.CYAN
spinner_kwargs: dict[str, Any] = Field(default_factory=dict)
hooks: "HookManager" = Field(default_factory=HookManager)
start_time: float | None = None
end_time: float | None = None
_duration: float | None = PrivateAttr(default=None)
_result: Any | None = PrivateAttr(default=None)
model_config = ConfigDict(arbitrary_types_allowed=True)
def __str__(self):
return f"Option(key='{self.key}', description='{self.description}')"
def set_result(self, result: Any) -> None:
"""Set the result of the action."""
self._result = result
def get_result(self) -> Any:
"""Get the result of the action."""
return self._result
def __call__(self, *args, **kwargs) -> Any:
context = {
"name": self.description,
"duration": None,
"args": args,
"kwargs": kwargs,
"option": self,
}
self._start_timer()
try:
run_async(self.hooks.trigger("before", context))
result = self._execute_action(*args, **kwargs)
self.set_result(result)
context["result"] = result
return result
except Exception as error:
context["exception"] = error
run_async(self.hooks.trigger("on_error", context))
if "exception" not in context:
logger.info(f"✅ Recovery hook handled error for Option '{self.key}'")
return self.get_result()
raise
finally:
self._stop_timer()
context["duration"] = self.get_duration()
if "exception" not in context:
run_async(self.hooks.trigger("after", context))
run_async(self.hooks.trigger("on_teardown", context))
def _execute_action(self, *args, **kwargs) -> Any:
if isinstance(self.action, BaseAction):
return self.action(*args, **kwargs)
return self.action()
def dry_run(self):
print(f"[DRY RUN] Option '{self.key}' would run: {self.description}")
if isinstance(self.action, BaseAction):
self.action.dry_run()
elif callable(self.action):
print(f"[DRY RUN] Action is a raw callable: {self.action.__name__}")
else:
print("[DRY RUN] Action is not callable.")

9
menu/task.py Normal file
View File

@ -0,0 +1,9 @@
import random
import time
def risky_task() -> str:
if random.random() > 0.4:
time.sleep(1)
raise ValueError("Random failure occurred")
return "Task succeeded!"