This commit is contained in:
Roland Thomas Jr 2025-04-14 22:10:27 -04:00
parent 8763753be7
commit 7a5ed20b33
Signed by: roland
GPG Key ID: 7C3C2B085A4C2872
26 changed files with 3933 additions and 0 deletions

18
.gitignore vendored Normal file
View File

@ -0,0 +1,18 @@
__pycache__/
*.py[cod]
*.log
*.db
*.sqlite3
.env
.venv
.cache
.mypy_cache
.pytest_cache
dist/
build/
*.egg-info/
.idea/
.vscode/
coverage.xml
.coverage

145
README.md Normal file
View File

@ -0,0 +1,145 @@
# ⚔️ Falyx
**Falyx** is a resilient, introspectable CLI framework for building robust, asynchronous command-line workflows with:
- ✅ Modular action chaining and rollback
- 🔁 Built-in retry handling
- ⚙️ Full lifecycle hooks (before, after, success, error, teardown)
- 📊 Execution tracing, logging, and introspection
- 🧙‍♂️ Async-first design with Process support
- 🧩 Extensible CLI menus and customizable output
> Built for developers who value *clarity*, *resilience*, and *visibility* in their terminal workflows.
---
## ✨ Why Falyx?
Modern CLI tools deserve the same resilience as production systems. Falyx makes it easy to:
- Compose workflows using `Action`, `ChainedAction`, or `ActionGroup`
- Inject the result of one step into the next (`last_result`)
- Handle flaky operations with retries and exponential backoff
- Roll back safely on failure with structured undo logic
- Add observability with execution timing, result tracking, and hooks
- Run in both interactive or headless (scriptable) modes
- Customize output with Rich `Table`s (grouping, theming, etc.)
---
## 🔧 Installation
```bash
pip install falyx
```
> Or install from source:
```bash
git clone https://github.com/yourname/falyx.git
cd falyx
poetry install
```
---
## ⚡ Quick Example
```python
from falyx import Action, ChainedAction, Menu
from falyx.hooks import RetryHandler, log_success
async def flaky_step():
import random, asyncio
await asyncio.sleep(0.2)
if random.random() < 0.8:
raise RuntimeError("Random failure!")
return "ok"
retry = RetryHandler()
step1 = Action("Step 1", flaky_step)
step1.hooks.register("on_error", retry.retry_on_error)
step2 = Action("Step 2", flaky_step)
step2.hooks.register("on_error", retry.retry_on_error)
chain = ChainedAction("My Pipeline", [step1, step2])
chain.hooks.register("on_success", log_success)
menu = Menu(title="🚀 Falyx Demo")
menu.add_command("R", "Run My Pipeline", chain)
if __name__ == "__main__":
import asyncio
asyncio.run(menu.run())
```
---
## 📦 Core Features
- ✅ Async-native `Action`, `ChainedAction`, `ActionGroup`
- 🔁 Retry policies + exponential backoff
- ⛓ Rollbacks on chained failures
- 🎛️ Headless or interactive CLI with argparse and prompt_toolkit
- 📊 Built-in execution registry, result tracking, and timing
- 🧠 Supports `ProcessAction` for CPU-bound workloads
- 🧩 Custom `Table` rendering for CLI menu views
- 🔍 Hook lifecycle: `before`, `on_success`, `on_error`, `after`, `on_teardown`
---
## 🔍 Execution Trace
```bash
[2025-04-14 10:33:22] DEBUG [Step 1] ⚙ flaky_step()
[2025-04-14 10:33:22] INFO [Step 1] 🔁 Retrying (1/3) in 1.0s...
[2025-04-14 10:33:23] DEBUG [Step 1] ✅ Success | Result: ok
[2025-04-14 10:33:23] DEBUG [My Pipeline] ✅ Result: ['ok', 'ok']
```
---
## 🧩 Components
| Component | Purpose |
|------------------|--------------------------------------------------------|
| `Action` | Single async task with hook + result injection support |
| `ChainedAction` | Sequential task runner with rollback |
| `ActionGroup` | Parallel runner for independent tasks |
| `ProcessAction` | CPU-bound task in a separate process (multiprocessing) |
| `Menu` | CLI runner with toggleable prompt or headless mode |
| `ExecutionContext`| Captures metadata per execution |
| `HookManager` | Lifecycle hook registration engine |
---
## 🧠 Design Philosophy
> “Like a phalanx: organized, resilient, and reliable.”
Falyx is designed for developers who dont just want CLI tools to run — they want them to **fail meaningfully**, **recover gracefully**, and **log clearly**.
---
## 🛣️ Roadmap
- [ ] Retry policy DSL (e.g., `max_retries=3, backoff="exponential"`)
- [ ] Metrics export (Prometheus-style)
- [ ] Plugin system for menu extensions
- [ ] Native support for structured logs + log forwarding
- [ ] Web UI for interactive execution history (maybe!)
---
## 🧑‍💼 License
MIT — use it, fork it, improve it. Attribution appreciated!
---
## 🌐 falyx.dev — **reliable actions, resilient flows**
---

23
falyx/__init__.py Normal file
View File

@ -0,0 +1,23 @@
import logging
from .action import Action, ActionGroup, ChainedAction, ProcessAction
from .command import Command
from .context import ExecutionContext, ResultsContext
from .execution_registry import ExecutionRegistry
from .falyx import Falyx
logger = logging.getLogger("falyx")
__version__ = "0.1.0"
__all__ = [
"Action",
"ChainedAction",
"ActionGroup",
"ProcessAction",
"Falyx",
"Command",
"ExecutionContext",
"ResultsContext",
"ExecutionRegistry",
]

41
falyx/__main__.py Normal file
View File

@ -0,0 +1,41 @@
# falyx/__main__.py
import asyncio
import logging
from falyx.action import Action
from falyx.falyx import Falyx
def build_falyx() -> Falyx:
"""Build and return a Falyx instance with all your commands."""
app = Falyx(title="🚀 Falyx CLI")
# Example commands
app.add_command(
key="B",
description="Build project",
action=Action("Build", lambda: print("📦 Building...")),
tags=["build"]
)
app.add_command(
key="T",
description="Run tests",
action=Action("Test", lambda: print("🧪 Running tests...")),
tags=["test"]
)
app.add_command(
key="D",
description="Deploy project",
action=Action("Deploy", lambda: print("🚀 Deploying...")),
tags=["deploy"]
)
return app
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING)
falyx = build_falyx()
asyncio.run(falyx.cli())

441
falyx/action.py Normal file
View File

@ -0,0 +1,441 @@
"""action.py
Any Action or Command is callable and supports the signature:
result = thing(*args, **kwargs)
This guarantees:
- Hook lifecycle (before/after/error/teardown)
- Timing
- Consistent return values
"""
from __future__ import annotations
import asyncio
import random
from abc import ABC, abstractmethod
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from typing import Any, Callable
from rich.console import Console
from rich.tree import Tree
from falyx.context import ExecutionContext, ResultsContext
from falyx.debug import register_debug_hooks
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.themes.colors import OneColors
from falyx.utils import ensure_async, logger
console = Console()
class BaseAction(ABC):
"""
Base class for actions. Actions can be simple functions or more
complex actions like `ChainedAction` or `ActionGroup`. They can also
be run independently or as part of Menu.
inject_last_result (bool): Whether to inject the previous action's result into kwargs.
inject_last_result_as (str): The name of the kwarg key to inject the result as
(default: 'last_result').
"""
def __init__(
self,
name: str,
hooks: HookManager | None = None,
inject_last_result: bool = False,
inject_last_result_as: str = "last_result",
logging_hooks: bool = False,
) -> None:
self.name = name
self.hooks = hooks or HookManager()
self.is_retryable: bool = False
self.results_context: ResultsContext | None = None
self.inject_last_result: bool = inject_last_result
self.inject_last_result_as: str = inject_last_result_as
if logging_hooks:
register_debug_hooks(self.hooks)
async def __call__(self, *args, **kwargs) -> Any:
return await self._run(*args, **kwargs)
@abstractmethod
async def _run(self, *args, **kwargs) -> Any:
raise NotImplementedError("_run must be implemented by subclasses")
@abstractmethod
async def preview(self, parent: Tree | None = None):
raise NotImplementedError("preview must be implemented by subclasses")
def set_results_context(self, results_context: ResultsContext):
self.results_context = results_context
def prepare_for_chain(self, results_context: ResultsContext) -> BaseAction:
"""
Prepare the action specifically for sequential (ChainedAction) execution.
Can be overridden for chain-specific logic.
"""
self.set_results_context(results_context)
return self
def prepare_for_group(self, results_context: ResultsContext) -> BaseAction:
"""
Prepare the action specifically for parallel (ActionGroup) execution.
Can be overridden for group-specific logic.
"""
self.set_results_context(results_context)
return self
def _maybe_inject_last_result(self, kwargs: dict[str, Any]) -> dict[str, Any]:
if self.inject_last_result and self.results_context:
key = self.inject_last_result_as
if key in kwargs:
logger.warning("[%s] ⚠️ Overriding '%s' with last_result", self.name, key)
kwargs = dict(kwargs)
kwargs[key] = self.results_context.last_result()
return kwargs
def register_hooks_recursively(self, hook_type: HookType, hook: Hook):
"""Register a hook for all actions and sub-actions."""
self.hooks.register(hook_type, hook)
def __str__(self):
return f"<{self.__class__.__name__} '{self.name}'>"
def __repr__(self):
return str(self)
class Action(BaseAction):
"""A simple action that runs a callable. It can be a function or a coroutine."""
def __init__(
self,
name: str,
action,
rollback=None,
args: tuple[Any, ...] = (),
kwargs: dict[str, Any] | None = None,
hooks: HookManager | None = None,
inject_last_result: bool = False,
inject_last_result_as: str = "last_result",
) -> None:
super().__init__(name, hooks, inject_last_result, inject_last_result_as)
self.action = ensure_async(action)
self.rollback = rollback
self.args = args
self.kwargs = kwargs or {}
self.is_retryable = True
async def _run(self, *args, **kwargs) -> Any:
combined_args = args + self.args
combined_kwargs = self._maybe_inject_last_result({**self.kwargs, **kwargs})
context = ExecutionContext(
name=self.name,
args=combined_args,
kwargs=combined_kwargs,
action=self,
)
context.start_timer()
try:
await self.hooks.trigger(HookType.BEFORE, context)
result = await self.action(*combined_args, **combined_kwargs)
context.result = result
await self.hooks.trigger(HookType.ON_SUCCESS, context)
return context.result
except Exception as error:
context.exception = error
await self.hooks.trigger(HookType.ON_ERROR, context)
if context.result is not None:
logger.info("[%s] ✅ Recovered: %s", self.name, self.name)
return context.result
raise error
finally:
context.stop_timer()
await self.hooks.trigger(HookType.AFTER, context)
await self.hooks.trigger(HookType.ON_TEARDOWN, context)
er.record(context)
async def preview(self, parent: Tree | None = None):
label = f"[{OneColors.GREEN_b}]⚙ Action[/] '{self.name}'"
if self.inject_last_result:
label += f" [dim](injects '{self.inject_last_result_as}')[/dim]"
if parent:
parent.add(label)
else:
console.print(Tree(label))
class ActionListMixin:
"""Mixin for managing a list of actions."""
def __init__(self) -> None:
self.actions: list[BaseAction] = []
def set_actions(self, actions: list[BaseAction]) -> None:
"""Replaces the current action list with a new one."""
self.actions.clear()
for action in actions:
self.add_action(action)
def add_action(self, action: BaseAction) -> None:
"""Adds an action to the list."""
self.actions.append(action)
def remove_action(self, name: str) -> None:
"""Removes an action by name."""
self.actions = [action for action in self.actions if action.name != name]
def has_action(self, name: str) -> bool:
"""Checks if an action with the given name exists."""
return any(action.name == name for action in self.actions)
def get_action(self, name: str) -> BaseAction | None:
"""Retrieves an action by name."""
for action in self.actions:
if action.name == name:
return action
return None
class ChainedAction(BaseAction, ActionListMixin):
"""A ChainedAction is a sequence of actions that are executed in order."""
def __init__(
self,
name: str,
actions: list[BaseAction] | None = None,
hooks: HookManager | None = None,
inject_last_result: bool = False,
inject_last_result_as: str = "last_result",
) -> None:
super().__init__(name, hooks, inject_last_result, inject_last_result_as)
ActionListMixin.__init__(self)
if actions:
self.set_actions(actions)
async def _run(self, *args, **kwargs) -> list[Any]:
results_context = ResultsContext(name=self.name)
if self.results_context:
results_context.add_result(self.results_context.last_result())
updated_kwargs = self._maybe_inject_last_result(kwargs)
context = ExecutionContext(
name=self.name,
args=args,
kwargs=updated_kwargs,
action=self,
extra={"results": [], "rollback_stack": []},
)
context.start_timer()
try:
await self.hooks.trigger(HookType.BEFORE, context)
for index, action in enumerate(self.actions):
results_context.current_index = index
prepared = action.prepare_for_chain(results_context)
result = await prepared(*args, **updated_kwargs)
results_context.add_result(result)
context.extra["results"].append(result)
context.extra["rollback_stack"].append(prepared)
context.result = context.extra["results"]
await self.hooks.trigger(HookType.ON_SUCCESS, context)
return context.result
except Exception as error:
context.exception = error
results_context.errors.append((results_context.current_index, error))
await self._rollback(context.extra["rollback_stack"], *args, **kwargs)
await self.hooks.trigger(HookType.ON_ERROR, context)
raise
finally:
context.stop_timer()
await self.hooks.trigger(HookType.AFTER, context)
await self.hooks.trigger(HookType.ON_TEARDOWN, context)
er.record(context)
async def _rollback(self, rollback_stack, *args, **kwargs):
for action in reversed(rollback_stack):
rollback = getattr(action, "rollback", None)
if rollback:
try:
logger.warning("[%s] ↩️ Rolling back...", action.name)
await action.rollback(*args, **kwargs)
except Exception as error:
logger.error("[%s]⚠️ Rollback failed: %s", action.name, error)
async def preview(self, parent: Tree | None = None):
label = f"[{OneColors.CYAN_b}]⛓ ChainedAction[/] '{self.name}'"
if self.inject_last_result:
label += f" [dim](injects '{self.inject_last_result_as}')[/dim]"
tree = parent.add(label) if parent else Tree(label)
for action in self.actions:
await action.preview(parent=tree)
if not parent:
console.print(tree)
def register_hooks_recursively(self, hook_type: HookType, hook: Hook):
"""Register a hook for all actions and sub-actions."""
self.hooks.register(hook_type, hook)
for action in self.actions:
action.register_hooks_recursively(hook_type, hook)
class ActionGroup(BaseAction, ActionListMixin):
"""An ActionGroup is a collection of actions that can be run in parallel."""
def __init__(
self,
name: str,
actions: list[BaseAction] | None = None,
hooks: HookManager | None = None,
inject_last_result: bool = False,
inject_last_result_as: str = "last_result",
):
super().__init__(name, hooks, inject_last_result, inject_last_result_as)
ActionListMixin.__init__(self)
if actions:
self.set_actions(actions)
async def _run(self, *args, **kwargs) -> list[tuple[str, Any]]:
results_context = ResultsContext(name=self.name, is_parallel=True)
if self.results_context:
results_context.set_shared_result(self.results_context.last_result())
updated_kwargs = self._maybe_inject_last_result(kwargs)
context = ExecutionContext(
name=self.name,
args=args,
kwargs=updated_kwargs,
action=self,
extra={"results": [], "errors": []},
)
async def run_one(action: BaseAction):
try:
prepared = action.prepare_for_group(results_context)
result = await prepared(*args, **updated_kwargs)
results_context.add_result((action.name, result))
context.extra["results"].append((action.name, result))
except Exception as error:
results_context.errors.append((results_context.current_index, error))
context.extra["errors"].append((action.name, error))
context.start_timer()
try:
await self.hooks.trigger(HookType.BEFORE, context)
await asyncio.gather(*[run_one(a) for a in self.actions])
if context.extra["errors"]:
context.exception = Exception(
f"{len(context.extra['errors'])} action(s) failed: " +
", ".join(name for name, _ in context.extra["errors"])
)
await self.hooks.trigger(HookType.ON_ERROR, context)
raise context.exception
context.result = context.extra["results"]
await self.hooks.trigger(HookType.ON_SUCCESS, context)
return context.result
except Exception as error:
context.exception = error
raise
finally:
context.stop_timer()
await self.hooks.trigger(HookType.AFTER, context)
await self.hooks.trigger(HookType.ON_TEARDOWN, context)
er.record(context)
async def preview(self, parent: Tree | None = None):
label = f"[{OneColors.MAGENTA_b}]⏩ ActionGroup (parallel)[/] '{self.name}'"
if self.inject_last_result:
label += f" [dim](receives '{self.inject_last_result_as}')[/dim]"
tree = parent.add(label) if parent else Tree(label)
actions = self.actions.copy()
random.shuffle(actions)
await asyncio.gather(*(action.preview(parent=tree) for action in actions))
if not parent:
console.print(tree)
def register_hooks_recursively(self, hook_type: HookType, hook: Hook):
"""Register a hook for all actions and sub-actions."""
super().register_hooks_recursively(hook_type, hook)
for action in self.actions:
action.register_hooks_recursively(hook_type, hook)
class ProcessAction(BaseAction):
"""A ProcessAction runs a function in a separate process using ProcessPoolExecutor."""
def __init__(
self,
name: str,
func: Callable[..., Any],
args: tuple = (),
kwargs: dict[str, Any] | None = None,
hooks: HookManager | None = None,
executor: ProcessPoolExecutor | None = None,
inject_last_result: bool = False,
inject_last_result_as: str = "last_result",
):
super().__init__(name, hooks, inject_last_result, inject_last_result_as)
self.func = func
self.args = args
self.kwargs = kwargs or {}
self.executor = executor or ProcessPoolExecutor()
self.is_retryable = True
async def _run(self, *args, **kwargs):
if self.inject_last_result:
last_result = self.results_context.last_result()
if not self._validate_pickleable(last_result):
raise ValueError(
f"Cannot inject last result into {self.name}: "
f"last result is not pickleable."
)
combined_args = args + self.args
combined_kwargs = self._maybe_inject_last_result({**self.kwargs, **kwargs})
context = ExecutionContext(
name=self.name,
args=combined_args,
kwargs=combined_kwargs,
action=self,
)
loop = asyncio.get_running_loop()
context.start_timer()
try:
await self.hooks.trigger(HookType.BEFORE, context)
result = await loop.run_in_executor(
self.executor, partial(self.func, *combined_args, **combined_kwargs)
)
context.result = result
await self.hooks.trigger(HookType.ON_SUCCESS, context)
return result
except Exception as error:
context.exception = error
await self.hooks.trigger(HookType.ON_ERROR, context)
if context.result is not None:
return context.result
raise
finally:
context.stop_timer()
await self.hooks.trigger(HookType.AFTER, context)
await self.hooks.trigger(HookType.ON_TEARDOWN, context)
er.record(context)
async def preview(self, parent: Tree | None = None):
label = f"[{OneColors.DARK_YELLOW_b}]🧠 ProcessAction (new process)[/] '{self.name}'"
if self.inject_last_result:
label += f" [dim](injects '{self.inject_last_result_as}')[/dim]"
if parent:
parent.add(label)
else:
console.print(Tree(label))
def _validate_pickleable(self, obj: Any) -> bool:
try:
import pickle
pickle.dumps(obj)
print("YES")
return True
except (pickle.PicklingError, TypeError):
print("NO")
return False

150
falyx/bottom_bar.py Normal file
View File

@ -0,0 +1,150 @@
"""bottom_bar.py"""
from typing import Any, Callable, Optional
from prompt_toolkit.formatted_text import HTML, merge_formatted_text
from prompt_toolkit.key_binding import KeyBindings
from rich.console import Console
from falyx.themes.colors import OneColors
from falyx.utils import CaseInsensitiveDict
class BottomBar:
"""Bottom Bar class for displaying a bottom bar in the terminal."""
def __init__(self, columns: int = 3, key_bindings: KeyBindings | None = None):
self.columns = columns
self.console = Console()
self._items: list[Callable[[], HTML]] = []
self._named_items: dict[str, Callable[[], HTML]] = {}
self._states: dict[str, Any] = CaseInsensitiveDict()
self.toggles: list[str] = []
self.key_bindings = key_bindings or KeyBindings()
def get_space(self) -> int:
return self.console.width // self.columns
def add_static(
self, name: str, text: str, fg: str = OneColors.BLACK, bg: str = OneColors.WHITE
) -> None:
def render():
return HTML(
f"<style fg='{fg}' bg='{bg}'>{text:^{self.get_space()}}</style>"
)
self._add_named(name, render)
def add_counter(
self,
name: str,
label: str,
current: int,
fg: str = OneColors.BLACK,
bg: str = OneColors.WHITE,
) -> None:
self._states[name] = (label, current)
def render():
label_, current_ = self._states[name]
text = f"{label_}: {current_}"
return HTML(
f"<style fg='{fg}' bg='{bg}'>{text:^{self.get_space()}}</style>"
)
self._add_named(name, render)
def add_total_counter(
self,
name: str,
label: str,
current: int,
total: int,
fg: str = OneColors.BLACK,
bg: str = OneColors.WHITE,
) -> None:
self._states[name] = (label, current, total)
if current > total:
raise ValueError(
f"Current value {current} is greater than total value {total}"
)
def render():
label_, current_, text_ = self._states[name]
text = f"{label_}: {current_}/{text_}"
return HTML(
f"<style fg='{fg}' bg='{bg}'>{text:^{self.get_space()}}</style>"
)
self._add_named(name, render)
def add_toggle(
self,
key: str,
label: str,
state: bool,
fg: str = OneColors.BLACK,
bg_on: str = OneColors.GREEN,
bg_off: str = OneColors.DARK_RED,
) -> None:
key = key.upper()
if key in self.toggles:
raise ValueError(f"Key {key} is already used as a toggle")
self._states[key] = (label, state)
self.toggles.append(key)
def render():
label_, state_ = self._states[key]
color = bg_on if state_ else bg_off
status = "ON" if state_ else "OFF"
text = f"({key.upper()}) {label_}: {status}"
return HTML(
f"<style bg='{color}' fg='{fg}'>{text:^{self.get_space()}}</style>"
)
self._add_named(key, render)
for k in (key.upper(), key.lower()):
@self.key_bindings.add(k)
def _(event, key=k):
self.toggle_state(key)
def toggle_state(self, name: str) -> bool:
label, state = self._states.get(name, (None, False))
new_state = not state
self.update_toggle(name, new_state)
return new_state
def update_toggle(self, name: str, state: bool) -> None:
if name in self._states:
label, _ = self._states[name]
self._states[name] = (label, state)
def increment_counter(self, name: str) -> None:
if name in self._states:
label, current = self._states[name]
self._states[name] = (label, current + 1)
def increment_total_counter(self, name: str) -> None:
if name in self._states:
label, current, total = self._states[name]
if current < total:
self._states[name] = (label, current + 1, total)
def update_counter(
self, name: str, current: Optional[int] = None, total: Optional[int] = None
) -> None:
if name in self._states:
label, c, t = self._states[name]
self._states[name] = (
label,
current if current is not None else c,
total if total is not None else t,
)
def _add_named(self, name: str, render_fn: Callable[[], HTML]) -> None:
self._named_items[name] = render_fn
self._items = list(self._named_items.values())
def render(self):
return merge_formatted_text([fn() for fn in self._items])

173
falyx/command.py Normal file
View File

@ -0,0 +1,173 @@
"""command.py
Any Action or Command is callable and supports the signature:
result = thing(*args, **kwargs)
This guarantees:
- Hook lifecycle (before/after/error/teardown)
- Timing
- Consistent return values
"""
from __future__ import annotations
from typing import Any, Callable
from prompt_toolkit.formatted_text import FormattedText
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator
from rich.console import Console
from rich.tree import Tree
from falyx.action import BaseAction
from falyx.context import ExecutionContext
from falyx.debug import register_debug_hooks
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.retry import RetryHandler, RetryPolicy
from falyx.themes.colors import OneColors
from falyx.utils import _noop, ensure_async, logger
console = Console()
class Command(BaseModel):
"""Class representing an command in the menu."""
key: str
description: str
aliases: list[str] = Field(default_factory=list)
action: BaseAction | Callable[[], Any] = _noop
args: tuple = ()
kwargs: dict[str, Any] = Field(default_factory=dict)
help_text: str = ""
color: str = OneColors.WHITE
confirm: bool = False
confirm_message: str = "Are you sure?"
preview_before_confirm: bool = True
spinner: bool = False
spinner_message: str = "Processing..."
spinner_type: str = "dots"
spinner_style: str = OneColors.CYAN
spinner_kwargs: dict[str, Any] = Field(default_factory=dict)
hooks: "HookManager" = Field(default_factory=HookManager)
retry_policy: RetryPolicy = Field(default_factory=RetryPolicy)
tags: list[str] = Field(default_factory=list)
logging_hooks: bool = False
_context: ExecutionContext | None = PrivateAttr(default=None)
model_config = ConfigDict(arbitrary_types_allowed=True)
def model_post_init(self, __context: Any) -> None:
"""Post-initialization to set up the action and hooks."""
self._auto_register_retry_hook()
if self.logging_hooks and isinstance(self.action, BaseAction):
register_debug_hooks(self.action.hooks)
def _auto_register_retry_hook(self):
if (
self.retry_policy.is_active() and
isinstance(self.action, BaseAction) and
self.action.is_retryable
):
logger.debug(f"Auto-registering retry handler for action: {self.key}")
retry_handler = RetryHandler(self.retry_policy)
self.action.hooks.register(HookType.ON_ERROR, retry_handler.retry_on_error)
elif self.retry_policy.is_active():
logger.debug(f"Retry policy is active but action is not retryable: {self.key}")
def update_retry_policy(self, policy: RetryPolicy):
self.retry_policy = policy
self._auto_register_retry_hook()
@field_validator("action", mode="before")
@classmethod
def wrap_callable_as_async(cls, action: Any) -> Any:
if isinstance(action, BaseAction):
return action
elif callable(action):
return ensure_async(action)
raise TypeError("Action must be a callable or an instance of BaseAction")
def __str__(self):
return f"Command(key='{self.key}', description='{self.description}')"
async def __call__(self, *args, **kwargs):
"""Run the action with full hook lifecycle, timing, and error handling."""
combined_args = args + self.args
combined_kwargs = {**self.kwargs, **kwargs}
context = ExecutionContext(
name=self.description,
args=combined_args,
kwargs=combined_kwargs,
action=self,
)
self._context = context
context.start_timer()
try:
await self.hooks.trigger(HookType.BEFORE, context)
result = await self.action(*combined_args, **combined_kwargs)
context.result = result
await self.hooks.trigger(HookType.ON_SUCCESS, context)
return context.result
except Exception as error:
context.exception = error
await self.hooks.trigger(HookType.ON_ERROR, context)
if context.result is not None:
logger.info(f"✅ Recovered: {self.key}")
return context.result
raise error
finally:
context.stop_timer()
await self.hooks.trigger(HookType.AFTER, context)
await self.hooks.trigger(HookType.ON_TEARDOWN, context)
er.record(context)
@property
def result(self) -> Any:
"""Get the result of the action."""
return self._context.result if self._context else None
@property
def confirmation_prompt(self) -> FormattedText:
"""Generate a styled prompt_toolkit FormattedText confirmation message."""
if self.confirm_message and self.confirm_message != "Are you sure?":
return FormattedText([
("class:confirm", self.confirm_message)
])
action_name = getattr(self.action, "__name__", None)
if isinstance(self.action, BaseAction):
action_name = self.action.name
prompt = [(OneColors.WHITE, "Confirm execution of ")]
prompt.append((OneColors.BLUE_b, f"{self.key}"))
prompt.append((OneColors.BLUE_b, f"{self.description} "))
if action_name:
prompt.append(("class:confirm", f"(calls `{action_name}`) "))
if self.args or self.kwargs:
prompt.append((OneColors.DARK_YELLOW, f"with args={self.args}, kwargs={self.kwargs} "))
return FormattedText(prompt)
def log_summary(self):
if self._context:
self._context.log_summary()
async def preview(self):
label = f"[{OneColors.GREEN_b}]Command:[/] '{self.key}'{self.description}"
if hasattr(self.action, "preview") and callable(self.action.preview):
tree = Tree(label)
await self.action.preview(parent=tree)
console.print(tree)
elif callable(self.action):
console.print(f"{label}")
console.print(
f"[{OneColors.LIGHT_RED_b}]→ Would call:[/] {self.action.__name__} "
f"[dim](args={self.args}, kwargs={self.kwargs})[/dim]"
)
else:
console.print(f"{label}")
console.print(f"[{OneColors.DARK_RED}]⚠️ Action is not callable or lacks a preview method.[/]")

103
falyx/config.py Normal file
View File

@ -0,0 +1,103 @@
"""config.py
Configuration loader for Falyx CLI commands."""
import importlib
from pathlib import Path
from typing import Any
import toml
import yaml
from falyx.action import Action, BaseAction
from falyx.command import Command
from falyx.retry import RetryPolicy
def wrap_if_needed(obj: Any, name=None) -> BaseAction | Command:
if isinstance(obj, (BaseAction, Command)):
return obj
elif callable(obj):
return Action(name=name or getattr(obj, "__name__", "unnamed"), action=obj)
else:
raise TypeError(
f"Cannot wrap object of type '{type(obj).__name__}' as a BaseAction or Command. "
"It must be a callable or an instance of BaseAction."
)
def import_action(dotted_path: str) -> Any:
"""Dynamically imports a callable from a dotted path like 'my.module.func'."""
module_path, _, attr = dotted_path.rpartition(".")
if not module_path:
raise ValueError(f"Invalid action path: {dotted_path}")
module = importlib.import_module(module_path)
return getattr(module, attr)
def loader(file_path: str) -> list[dict[str, Any]]:
"""
Load command definitions from a YAML or TOML file.
Each command should be defined as a dictionary with at least:
- key: a unique single-character key
- description: short description
- action: dotted import path to the action function/class
Args:
file_path (str): Path to the config file (YAML or TOML).
Returns:
list[dict[str, Any]]: A list of command configuration dictionaries.
Raises:
ValueError: If the file format is unsupported or file cannot be parsed.
"""
path = Path(file_path)
if not path.is_file():
raise FileNotFoundError(f"No such config file: {file_path}")
suffix = path.suffix
with path.open("r", encoding="UTF-8") as config_file:
if suffix in (".yaml", ".yml"):
raw_config = yaml.safe_load(config_file)
elif suffix == ".toml":
raw_config = toml.load(config_file)
else:
raise ValueError(f"Unsupported config format: {suffix}")
if not isinstance(raw_config, list):
raise ValueError("Configuration file must contain a list of command definitions.")
required = ["key", "description", "action"]
commands = []
for entry in raw_config:
for field in required:
if field not in entry:
raise ValueError(f"Missing '{field}' in command entry: {entry}")
command_dict = {
"key": entry["key"],
"description": entry["description"],
"aliases": entry.get("aliases", []),
"action": wrap_if_needed(import_action(entry["action"]),
name=entry["description"]),
"args": tuple(entry.get("args", ())),
"kwargs": entry.get("kwargs", {}),
"help_text": entry.get("help_text", ""),
"color": entry.get("color", "white"),
"confirm": entry.get("confirm", False),
"confirm_message": entry.get("confirm_message", "Are you sure?"),
"preview_before_confirm": entry.get("preview_before_confirm", True),
"spinner": entry.get("spinner", False),
"spinner_message": entry.get("spinner_message", "Processing..."),
"spinner_type": entry.get("spinner_type", "dots"),
"spinner_style": entry.get("spinner_style", "cyan"),
"spinner_kwargs": entry.get("spinner_kwargs", {}),
"tags": entry.get("tags", []),
"retry_policy": RetryPolicy(**entry.get("retry_policy", {})),
}
commands.append(command_dict)
return commands

127
falyx/context.py Normal file
View File

@ -0,0 +1,127 @@
"""context.py"""
import time
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, ConfigDict, Field
class ExecutionContext(BaseModel):
name: str
args: tuple = ()
kwargs: dict = {}
action: Any
result: Any | None = None
exception: Exception | None = None
start_time: float | None = None
end_time: float | None = None
extra: dict[str, Any] = Field(default_factory=dict)
model_config = ConfigDict(arbitrary_types_allowed=True)
def start_timer(self):
self.start_time = time.perf_counter()
def stop_timer(self):
self.end_time = time.perf_counter()
@property
def duration(self) -> Optional[float]:
if self.start_time is not None and self.end_time is not None:
return self.end_time - self.start_time
return None
@property
def success(self) -> bool:
return self.exception is None
@property
def status(self) -> str:
return "OK" if self.success else "ERROR"
def as_dict(self) -> dict:
return {
"name": self.name,
"result": self.result,
"exception": repr(self.exception) if self.exception else None,
"duration": self.duration,
"extra": self.extra,
}
def log_summary(self, logger=None):
summary = self.as_dict()
msg = f"[SUMMARY] {summary['name']} | "
if self.start_time:
start_str = datetime.fromtimestamp(self.start_time).strftime("%H:%M:%S")
msg += f"Start: {start_str} | "
if self.end_time:
end_str = datetime.fromtimestamp(self.end_time).strftime("%H:%M:%S")
msg += f"End: {end_str} | "
msg += f"Duration: {summary['duration']:.3f}s | "
if summary["exception"]:
msg += f"❌ Exception: {summary['exception']}"
else:
msg += f"✅ Result: {summary['result']}"
(logger or print)(msg)
def to_log_line(self) -> str:
"""Structured flat-line format for logging and metrics."""
duration_str = f"{self.duration:.3f}s" if self.duration is not None else "n/a"
exception_str = f"{type(self.exception).__name__}: {self.exception}" if self.exception else "None"
return (
f"[{self.name}] status={self.status} duration={duration_str} "
f"result={repr(self.result)} exception={exception_str}"
)
def __str__(self) -> str:
duration_str = f"{self.duration:.3f}s" if self.duration is not None else "n/a"
result_str = f"Result: {repr(self.result)}" if self.success else f"Exception: {self.exception}"
return (
f"<ExecutionContext '{self.name}' | {self.status} | "
f"Duration: {duration_str} | {result_str}>"
)
def __repr__(self) -> str:
return (
f"ExecutionContext("
f"name={self.name!r}, "
f"duration={f'{self.duration:.3f}' if self.duration is not None else 'n/a'}, "
f"result={self.result!r}, "
f"exception={self.exception!r})"
)
class ResultsContext(BaseModel):
name: str
results: list[Any] = Field(default_factory=list)
errors: list[tuple[int, Exception]] = Field(default_factory=list)
current_index: int = -1
is_parallel: bool = False
shared_result: Any | None = None
model_config = ConfigDict(arbitrary_types_allowed=True)
def add_result(self, result: Any) -> None:
self.results.append(result)
def set_shared_result(self, result: Any) -> None:
self.shared_result = result
if self.is_parallel:
self.results.append(result)
def last_result(self) -> Any:
if self.is_parallel:
return self.shared_result
return self.results[-1] if self.results else None
def __str__(self) -> str:
parallel_label = "Parallel" if self.is_parallel else "Sequential"
return (
f"<{parallel_label}ResultsContext '{self.name}' | "
f"Results: {self.results} | "
f"Errors: {self.errors}>"
)

43
falyx/debug.py Normal file
View File

@ -0,0 +1,43 @@
from falyx.context import ExecutionContext
from falyx.hook_manager import HookManager, HookType
from falyx.utils import logger
def log_before(context: ExecutionContext):
"""Log the start of an action."""
args = ", ".join(map(repr, context.args))
kwargs = ", ".join(f"{k}={v!r}" for k, v in context.kwargs.items())
signature = ", ".join(filter(None, [args, kwargs]))
logger.info("[%s] 🚀 Starting → %s(%s)", context.name, context.action, signature)
def log_success(context: ExecutionContext):
"""Log the successful completion of an action."""
result_str = repr(context.result)
if len(result_str) > 100:
result_str = result_str[:100] + "..."
logger.debug("[%s] ✅ Success → Result: %s", context.name, result_str)
def log_after(context: ExecutionContext):
"""Log the completion of an action, regardless of success or failure."""
logger.debug("[%s] ⏱️ Finished in %.3fs", context.name, context.duration)
def log_error(context: ExecutionContext):
"""Log an error that occurred during the action."""
logger.error(
"[%s] ❌ Error (%s): %s",
context.name,
type(context.exception).__name__,
context.exception,
exc_info=True,
)
def register_debug_hooks(hooks: HookManager):
hooks.register(HookType.BEFORE, log_before)
hooks.register(HookType.AFTER, log_after)
hooks.register(HookType.ON_SUCCESS, log_success)
hooks.register(HookType.ON_ERROR, log_error)

22
falyx/exceptions.py Normal file
View File

@ -0,0 +1,22 @@
class FalyxError(Exception):
"""Custom exception for the Menu class."""
class CommandAlreadyExistsError(FalyxError):
"""Exception raised when an command with the same key already exists in the menu."""
class InvalidHookError(FalyxError):
"""Exception raised when a hook is not callable."""
class InvalidActionError(FalyxError):
"""Exception raised when an action is not callable."""
class NotAFalyxError(FalyxError):
"""Exception raised when the provided submenu is not an instance of Menu."""
class CircuitBreakerOpen(FalyxError):
"""Exception raised when the circuit breaker is open."""

View File

@ -0,0 +1,76 @@
"""execution_registry.py"""
from collections import defaultdict
from datetime import datetime
from typing import Dict, List
from rich import box
from rich.console import Console
from rich.table import Table
from falyx.context import ExecutionContext
from falyx.utils import logger
class ExecutionRegistry:
_store_by_name: Dict[str, List[ExecutionContext]] = defaultdict(list)
_store_all: List[ExecutionContext] = []
_console = Console(color_system="truecolor")
@classmethod
def record(cls, context: ExecutionContext):
"""Record an execution context."""
logger.debug(context.to_log_line())
cls._store_by_name[context.name].append(context)
cls._store_all.append(context)
@classmethod
def get_all(cls) -> List[ExecutionContext]:
return cls._store_all
@classmethod
def get_by_name(cls, name: str) -> List[ExecutionContext]:
return cls._store_by_name.get(name, [])
@classmethod
def get_latest(cls) -> ExecutionContext:
return cls._store_all[-1]
@classmethod
def clear(cls):
cls._store_by_name.clear()
cls._store_all.clear()
@classmethod
def summary(cls):
table = Table(title="[📊] Execution History", expand=True, box=box.SIMPLE)
table.add_column("Name", style="bold cyan")
table.add_column("Start", justify="right", style="dim")
table.add_column("End", justify="right", style="dim")
table.add_column("Duration", justify="right")
table.add_column("Status", style="bold")
table.add_column("Result / Exception", overflow="fold")
for ctx in cls.get_all():
start = datetime.fromtimestamp(ctx.start_time).strftime("%H:%M:%S") if ctx.start_time else "n/a"
end = datetime.fromtimestamp(ctx.end_time).strftime("%H:%M:%S") if ctx.end_time else "n/a"
duration = f"{ctx.duration:.3f}s" if ctx.duration else "n/a"
if ctx.exception:
status = "[bold red]❌ Error"
result = repr(ctx.exception)
else:
status = "[green]✅ Success"
result = repr(ctx.result)
table.add_row(ctx.name, start, end, duration, status, result)
cls._console.print(table)
@classmethod
def get_history_action(cls) -> "Action":
"""Return an Action that prints the execution summary."""
from falyx.action import Action
async def show_history():
cls.summary()
return Action(name="View Execution History", action=show_history)

831
falyx/falyx.py Normal file
View File

@ -0,0 +1,831 @@
"""falyx.py
This class creates a Falyx object that creates a selectable menu
with customizable commands and functionality.
It allows for adding commands, and their accompanying actions,
and provides a method to display the menu and handle user input.
This class uses the `rich` library to display the menu in a
formatted and visually appealing way.
This class also uses the `prompt_toolkit` library to handle
user input and create an interactive experience.
"""
import asyncio
import logging
import sys
from argparse import ArgumentParser, Namespace
from difflib import get_close_matches
from functools import cached_property
from typing import Any, Callable
from prompt_toolkit import PromptSession
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.formatted_text import AnyFormattedText
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.validation import Validator
from rich import box
from rich.console import Console
from rich.markdown import Markdown
from rich.table import Table
from falyx.action import BaseAction
from falyx.bottom_bar import BottomBar
from falyx.command import Command
from falyx.context import ExecutionContext
from falyx.debug import register_debug_hooks, log_before, log_success, log_error, log_after
from falyx.exceptions import (CommandAlreadyExistsError, FalyxError,
InvalidActionError, NotAFalyxError)
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType, Hook
from falyx.retry import RetryPolicy
from falyx.themes.colors import OneColors, get_nord_theme
from falyx.utils import CaseInsensitiveDict, async_confirm, chunks, logger
from falyx.version import __version__
class Falyx:
"""Class to create a menu with commands.
Hook functions must have the signature:
def hook(command: Command) -> None:
where `command` is the selected command.
Error hook functions must have the signature:
def error_hook(command: Command, error: Exception) -> None:
where `command` is the selected command and `error` is the exception raised.
Hook execution order:
1. Before action hooks of the menu.
2. Before action hooks of the selected command.
3. Action of the selected command.
4. After action hooks of the selected command.
5. After action hooks of the menu.
6. On error hooks of the selected command (if an error occurs).
7. On error hooks of the menu (if an error occurs).
Parameters:
title (str|Markdown): The title of the menu.
columns (int): The number of columns to display the commands in.
prompt (AnyFormattedText): The prompt to display when asking for input.
bottom_bar (str|callable|None): The text to display in the bottom bar.
"""
def __init__(
self,
title: str | Markdown = "Menu",
prompt: str | AnyFormattedText = "> ",
columns: int = 3,
bottom_bar: BottomBar | str | Callable[[], None] | None = None,
welcome_message: str | Markdown = "",
exit_message: str | Markdown = "",
key_bindings: KeyBindings | None = None,
include_history_command: bool = True,
include_help_command: bool = False,
confirm_on_error: bool = True,
never_confirm: bool = False,
always_confirm: bool = False,
cli_args: Namespace | None = None,
custom_table: Callable[["Falyx"], Table] | Table | None = None,
) -> None:
"""Initializes the Falyx object."""
self.title: str | Markdown = title
self.prompt: str | AnyFormattedText = prompt
self.columns: int = columns
self.commands: dict[str, Command] = CaseInsensitiveDict()
self.exit_command: Command = self._get_exit_command()
self.history_command: Command | None = self._get_history_command() if include_history_command else None
self.help_command: Command | None = self._get_help_command() if include_help_command else None
self.console: Console = Console(color_system="truecolor", theme=get_nord_theme())
self.welcome_message: str | Markdown = welcome_message
self.exit_message: str | Markdown = exit_message
self.hooks: HookManager = HookManager()
self.last_run_command: Command | None = None
self.key_bindings: KeyBindings = key_bindings or KeyBindings()
self.bottom_bar: BottomBar | str | Callable[[], None] = bottom_bar or BottomBar(columns=columns, key_bindings=self.key_bindings)
self.confirm_on_error: bool = confirm_on_error
self._never_confirm: bool = never_confirm
self._always_confirm: bool = always_confirm
self.cli_args: Namespace | None = cli_args
self.custom_table: Callable[["Falyx"], Table] | Table | None = custom_table
@property
def _name_map(self) -> dict[str, Command]:
"""Builds a mapping of all valid input names (keys, aliases, normalized names) to Command objects.
If a collision occurs, logs a warning and keeps the first registered command.
"""
mapping: dict[str, Command] = {}
def register(name: str, cmd: Command):
norm = name.upper().strip()
if norm in mapping:
existing = mapping[norm]
if existing is not cmd:
logger.warning(
f"[alias conflict] '{name}' already assigned to '{existing.description}'."
f" Skipping for '{cmd.description}'."
)
else:
mapping[norm] = cmd
for special in [self.exit_command, self.history_command, self.help_command]:
if special:
register(special.key, special)
for alias in special.aliases:
register(alias, special)
register(special.description, special)
for cmd in self.commands.values():
register(cmd.key, cmd)
for alias in cmd.aliases:
register(alias, cmd)
register(cmd.description, cmd)
return mapping
def get_title(self) -> str:
"""Returns the string title of the menu."""
if isinstance(self.title, str):
return self.title
elif isinstance(self.title, Markdown):
return self.title.markup
return self.title
def _get_exit_command(self) -> Command:
"""Returns the back command for the menu."""
return Command(
key="Q",
description="Exit",
aliases=["EXIT", "QUIT"],
color=OneColors.DARK_RED,
)
def _get_history_command(self) -> Command:
"""Returns the history command for the menu."""
return Command(
key="Y",
description="History",
aliases=["HISTORY"],
action=er.get_history_action(),
color=OneColors.DARK_YELLOW,
)
async def _show_help(self):
table = Table(title="[bold cyan]Help Menu[/]", box=box.SIMPLE)
table.add_column("Key", style="bold", no_wrap=True)
table.add_column("Aliases", style="dim", no_wrap=True)
table.add_column("Description", style="dim", overflow="fold")
table.add_column("Tags", style="dim", no_wrap=True)
for command in self.commands.values():
help_text = command.help_text or command.description
table.add_row(
f"[{command.color}]{command.key}[/]",
", ".join(command.aliases) if command.aliases else "None",
help_text,
", ".join(command.tags) if command.tags else "None"
)
table.add_row(
f"[{self.exit_command.color}]{self.exit_command.key}[/]",
", ".join(self.exit_command.aliases),
"Exit this menu or program"
)
if self.history_command:
table.add_row(
f"[{self.history_command.color}]{self.history_command.key}[/]",
", ".join(self.history_command.aliases),
"History of executed actions"
)
if self.help_command:
table.add_row(
f"[{self.help_command.color}]{self.help_command.key}[/]",
", ".join(self.help_command.aliases),
"Show this help menu"
)
self.console.print(table)
def _get_help_command(self) -> Command:
"""Returns the help command for the menu."""
return Command(
key="H",
aliases=["HELP"],
description="Help",
action=self._show_help,
color=OneColors.LIGHT_YELLOW,
)
def _get_completer(self) -> WordCompleter:
"""Completer to provide auto-completion for the menu commands."""
keys = [self.exit_command.key]
keys.extend(self.exit_command.aliases)
if self.history_command:
keys.append(self.history_command.key)
keys.extend(self.history_command.aliases)
if self.help_command:
keys.append(self.help_command.key)
keys.extend(self.help_command.aliases)
for cmd in self.commands.values():
keys.append(cmd.key)
keys.extend(cmd.aliases)
return WordCompleter(keys, ignore_case=True)
def _get_validator(self) -> Validator:
"""Validator to check if the input is a valid command or toggle key."""
keys = {self.exit_command.key.upper()}
keys.update({alias.upper() for alias in self.exit_command.aliases})
if self.history_command:
keys.add(self.history_command.key.upper())
keys.update({alias.upper() for alias in self.history_command.aliases})
if self.help_command:
keys.add(self.help_command.key.upper())
keys.update({alias.upper() for alias in self.help_command.aliases})
for cmd in self.commands.values():
keys.add(cmd.key.upper())
keys.update({alias.upper() for alias in cmd.aliases})
if isinstance(self.bottom_bar, BottomBar):
toggle_keys = {key.upper() for key in self.bottom_bar.toggles}
else:
toggle_keys = set()
commands_str = ", ".join(sorted(keys))
toggles_str = ", ".join(sorted(toggle_keys))
message_lines = ["Invalid input. Available keys:"]
if keys:
message_lines.append(f" Commands: {commands_str}")
if toggle_keys:
message_lines.append(f" Toggles: {toggles_str}")
error_message = " ".join(message_lines)
def validator(text):
return True if self.get_command(text, from_validate=True) else False
return Validator.from_callable(
validator,
error_message=error_message,
move_cursor_to_end=True,
)
def _invalidate_session_cache(self):
"""Forces the session to be recreated on the next access."""
if hasattr(self, "session"):
del self.session
def add_toggle(self, key: str, label: str, state: bool) -> None:
"""Adds a toggle to the bottom bar."""
assert isinstance(self.bottom_bar, BottomBar), "Bottom bar must be an instance of BottomBar."
self.bottom_bar.add_toggle(key, label, state)
self._invalidate_session_cache()
def add_counter(self, name: str, label: str, current: int) -> None:
"""Adds a counter to the bottom bar."""
assert isinstance(self.bottom_bar, BottomBar), "Bottom bar must be an instance of BottomBar."
self.bottom_bar.add_counter(name, label, current)
self._invalidate_session_cache()
def add_total_counter(self, name: str, label: str, current: int, total: int) -> None:
"""Adds a counter to the bottom bar."""
assert isinstance(self.bottom_bar, BottomBar), "Bottom bar must be an instance of BottomBar."
self.bottom_bar.add_total_counter(name, label, current, total)
self._invalidate_session_cache()
def add_static(self, name: str, text: str) -> None:
"""Adds a static element to the bottom bar."""
assert isinstance(self.bottom_bar, BottomBar), "Bottom bar must be an instance of BottomBar."
self.bottom_bar.add_static(name, text)
self._invalidate_session_cache
def get_toggle_state(self, key: str) -> bool | None:
assert isinstance(self.bottom_bar, BottomBar), "Bottom bar must be an instance of BottomBar."
if key.upper() in self.bottom_bar._states:
"""Returns the state of a toggle."""
return self.bottom_bar._states[key.upper()][1]
return None
def add_help_command(self):
"""Adds a help command to the menu if it doesn't already exist."""
if not self.help_command:
self.help_command = self._get_help_command()
self._invalidate_session_cache()
def add_history_command(self):
"""Adds a history command to the menu if it doesn't already exist."""
if not self.history_command:
self.history_command = self._get_history_command()
self._invalidate_session_cache()
def _get_bottom_bar(self) -> Callable[[], Any] | str | None:
"""Returns the bottom bar for the menu."""
if isinstance(self.bottom_bar, BottomBar) and self.bottom_bar._items:
return self.bottom_bar.render
elif callable(self.bottom_bar):
return self.bottom_bar
elif isinstance(self.bottom_bar, str):
return self.bottom_bar
return None
@cached_property
def session(self) -> PromptSession:
"""Returns the prompt session for the menu."""
return PromptSession(
message=self.prompt,
multiline=False,
completer=self._get_completer(),
reserve_space_for_menu=1,
validator=self._get_validator(),
bottom_toolbar=self._get_bottom_bar(),
key_bindings=self.key_bindings,
)
def register_all_hooks(self, hook_type: HookType, hooks: Hook | list[Hook]) -> None:
"""Registers hooks for all commands in the menu and actions recursively."""
hook_list = hooks if isinstance(hooks, list) else [hooks]
for hook in hook_list:
if not callable(hook):
raise InvalidActionError("Hook must be a callable.")
self.hooks.register(hook_type, hook)
for command in self.commands.values():
command.hooks.register(hook_type, hook)
if isinstance(command.action, Falyx):
command.action.register_all_hooks(hook_type, hook)
if isinstance(command.action, BaseAction):
command.action.register_hooks_recursively(hook_type, hook)
def register_all_with_debug_hooks(self) -> None:
"""Registers debug hooks for all commands in the menu and actions recursively."""
self.register_all_hooks(HookType.BEFORE, log_before)
self.register_all_hooks(HookType.ON_SUCCESS, log_success)
self.register_all_hooks(HookType.ON_ERROR, log_error)
self.register_all_hooks(HookType.AFTER, log_after)
def debug_hooks(self) -> None:
"""Logs the names of all hooks registered for the menu and its commands."""
def hook_names(hook_list):
return [hook.__name__ for hook in hook_list]
logger.debug(f"Menu-level before hooks: {hook_names(self.hooks._hooks[HookType.BEFORE])}")
logger.debug(f"Menu-level success hooks: {hook_names(self.hooks._hooks[HookType.ON_SUCCESS])}")
logger.debug(f"Menu-level error hooks: {hook_names(self.hooks._hooks[HookType.ON_ERROR])}")
logger.debug(f"Menu-level after hooks: {hook_names(self.hooks._hooks[HookType.AFTER])}")
logger.debug(f"Menu-level on_teardown hooks: {hook_names(self.hooks._hooks[HookType.ON_TEARDOWN])}")
for key, command in self.commands.items():
logger.debug(f"[Command '{key}'] before: {hook_names(command.hooks._hooks[HookType.BEFORE])}")
logger.debug(f"[Command '{key}'] success: {hook_names(command.hooks._hooks[HookType.ON_SUCCESS])}")
logger.debug(f"[Command '{key}'] error: {hook_names(command.hooks._hooks[HookType.ON_ERROR])}")
logger.debug(f"[Command '{key}'] after: {hook_names(command.hooks._hooks[HookType.AFTER])}")
logger.debug(f"[Command '{key}'] on_teardown: {hook_names(command.hooks._hooks[HookType.ON_TEARDOWN])}")
def _validate_command_key(self, key: str) -> None:
"""Validates the command key to ensure it is unique."""
key = key.upper()
toggles = self.bottom_bar.toggles if isinstance(self.bottom_bar, BottomBar) else []
collisions = []
if key in self.commands:
collisions.append("command")
if key == self.exit_command.key.upper():
collisions.append("back command")
if self.history_command and key == self.history_command.key.upper():
collisions.append("history command")
if self.help_command and key == self.help_command.key.upper():
collisions.append("help command")
if key in toggles:
collisions.append("toggle")
if collisions:
raise CommandAlreadyExistsError(f"Command key '{key}' conflicts with existing {', '.join(collisions)}.")
def update_exit_command(
self,
key: str = "0",
description: str = "Exit",
action: Callable[[], Any] = lambda: None,
color: str = OneColors.DARK_RED,
confirm: bool = False,
confirm_message: str = "Are you sure?",
) -> None:
"""Updates the back command of the menu."""
if not callable(action):
raise InvalidActionError("Action must be a callable.")
self._validate_command_key(key)
self.exit_command = Command(
key=key,
description=description,
action=action,
color=color,
confirm=confirm,
confirm_message=confirm_message,
)
self._invalidate_session_cache()
def add_submenu(self, key: str, description: str, submenu: "Falyx", color: str = OneColors.CYAN) -> None:
"""Adds a submenu to the menu."""
if not isinstance(submenu, Falyx):
raise NotAFalyxError("submenu must be an instance of Falyx.")
self._validate_command_key(key)
self.add_command(key, description, submenu.menu, color=color)
self._invalidate_session_cache()
def add_commands(self, commands: list[dict]) -> None:
"""Adds multiple commands to the menu."""
for command in commands:
self.add_command(**command)
def add_command(
self,
key: str,
description: str,
action: BaseAction | Callable[[], Any],
aliases: list[str] | None = None,
args: tuple = (),
kwargs: dict[str, Any] = {},
help_text: str = "",
color: str = OneColors.WHITE,
confirm: bool = False,
confirm_message: str = "Are you sure?",
preview_before_confirm: bool = True,
spinner: bool = False,
spinner_message: str = "Processing...",
spinner_type: str = "dots",
spinner_style: str = OneColors.CYAN,
spinner_kwargs: dict[str, Any] | None = None,
hooks: HookManager | None = None,
before_hooks: list[Callable] | None = None,
success_hooks: list[Callable] | None = None,
after_hooks: list[Callable] | None = None,
error_hooks: list[Callable] | None = None,
teardown_hooks: list[Callable] | None = None,
tags: list[str] | None = None,
retry_policy: RetryPolicy | None = None,
) -> Command:
"""Adds an command to the menu, preventing duplicates."""
self._validate_command_key(key)
command = Command(
key=key,
description=description,
aliases=aliases if aliases else [],
help_text=help_text,
action=action,
args=args,
kwargs=kwargs,
color=color,
confirm=confirm,
confirm_message=confirm_message,
preview_before_confirm=preview_before_confirm,
spinner=spinner,
spinner_message=spinner_message,
spinner_type=spinner_type,
spinner_style=spinner_style,
spinner_kwargs=spinner_kwargs or {},
tags=tags if tags else [],
retry_policy=retry_policy if retry_policy else RetryPolicy(),
)
if hooks:
if not isinstance(hooks, HookManager):
raise NotAFalyxError("hooks must be an instance of HookManager.")
command.hooks = hooks
for hook in before_hooks or []:
command.hooks.register(HookType.BEFORE, hook)
for hook in success_hooks or []:
command.hooks.register(HookType.ON_SUCCESS, hook)
for hook in error_hooks or []:
command.hooks.register(HookType.ON_ERROR, hook)
for hook in after_hooks or []:
command.hooks.register(HookType.AFTER, hook)
for hook in teardown_hooks or []:
command.hooks.register(HookType.ON_TEARDOWN, hook)
self.commands[key] = command
self._invalidate_session_cache()
return command
def get_bottom_row(self) -> list[str]:
"""Returns the bottom row of the table for displaying additional commands."""
bottom_row = []
if self.history_command:
bottom_row.append(f"[{self.history_command.key}] [{self.history_command.color}]{self.history_command.description}")
if self.help_command:
bottom_row.append(f"[{self.help_command.key}] [{self.help_command.color}]{self.help_command.description}")
bottom_row.append(f"[{self.exit_command.key}] [{self.exit_command.color}]{self.exit_command.description}")
return bottom_row
def build_default_table(self) -> Table:
"""Build the standard table layout. Developers can subclass or call this in custom tables."""
table = Table(title=self.title, show_header=False, box=box.SIMPLE, expand=True)
for chunk in chunks(self.commands.items(), self.columns):
row = []
for key, command in chunk:
row.append(f"[{key}] [{command.color}]{command.description}")
table.add_row(*row)
bottom_row = self.get_bottom_row()
table.add_row(*bottom_row)
return table
@property
def table(self) -> Table:
"""Creates or returns a custom table to display the menu commands."""
if callable(self.custom_table):
return self.custom_table(self)
elif isinstance(self.custom_table, Table):
return self.custom_table
else:
return self.build_default_table()
def get_command(self, choice: str, from_validate=False) -> Command | None:
"""Returns the selected command based on user input. Supports keys, aliases, and abbreviations."""
choice = choice.upper()
name_map = self._name_map
if choice in name_map:
return name_map[choice]
prefix_matches = [cmd for key, cmd in name_map.items() if key.startswith(choice)]
if len(prefix_matches) == 1:
return prefix_matches[0]
fuzzy_matches = get_close_matches(choice, list(name_map.keys()), n=3, cutoff=0.7)
if fuzzy_matches:
if not from_validate:
self.console.print(f"[{OneColors.LIGHT_YELLOW}]⚠️ Unknown command '{choice}'. Did you mean:[/] ")
for match in fuzzy_matches:
cmd = name_map[match]
self.console.print(f" • [bold]{match}[/] → {cmd.description}")
else:
if not from_validate:
self.console.print(f"[{OneColors.LIGHT_YELLOW}]⚠️ Unknown command '{choice}'[/]")
return None
async def _should_run_action(self, selected_command: Command) -> bool:
if self._never_confirm:
return True
if self.cli_args and getattr(self.cli_args, "skip_confirm", False):
return True
if (self._always_confirm or
selected_command.confirm or
self.cli_args and getattr(self.cli_args, "force_confirm", False)
):
if selected_command.preview_before_confirm:
await selected_command.preview()
confirm_answer = await async_confirm(selected_command.confirmation_prompt)
if confirm_answer:
logger.info(f"[{OneColors.LIGHT_YELLOW}][{selected_command.description}]🔐 confirmed.")
else:
logger.info(f"[{OneColors.DARK_RED}][{selected_command.description}]❌ cancelled.")
return confirm_answer
return True
def _create_context(self, selected_command: Command) -> ExecutionContext:
"""Creates a context dictionary for the selected command."""
return ExecutionContext(
name=selected_command.description,
args=tuple(),
kwargs={},
action=selected_command,
)
async def _run_action_with_spinner(self, command: Command) -> Any:
"""Runs the action of the selected command with a spinner."""
with self.console.status(
command.spinner_message,
spinner=command.spinner_type,
spinner_style=command.spinner_style,
**command.spinner_kwargs,
):
return await command()
async def _handle_action_error(self, selected_command: Command, error: Exception) -> bool:
"""Handles errors that occur during the action of the selected command."""
logger.exception(f"Error executing '{selected_command.description}': {error}")
self.console.print(f"[{OneColors.DARK_RED}]An error occurred while executing "
f"{selected_command.description}:[/] {error}")
if self.confirm_on_error and not self._never_confirm:
return await async_confirm("An error occurred. Do you wish to continue?")
if self._never_confirm:
return True
return False
async def process_command(self) -> bool:
"""Processes the action of the selected command."""
choice = await self.session.prompt_async()
selected_command = self.get_command(choice)
if not selected_command:
logger.info(f"[{OneColors.LIGHT_YELLOW}] Invalid command '{choice}'.")
return True
self.last_run_command = selected_command
if selected_command == self.exit_command:
logger.info(f"🔙 Back selected: exiting {self.get_title()}")
return False
if not await self._should_run_action(selected_command):
logger.info(f"[{OneColors.DARK_RED}] {selected_command.description} cancelled.")
return True
context = self._create_context(selected_command)
context.start_timer()
try:
await self.hooks.trigger(HookType.BEFORE, context)
if selected_command.spinner:
result = await self._run_action_with_spinner(selected_command)
else:
result = await selected_command()
context.result = result
await self.hooks.trigger(HookType.ON_SUCCESS, context)
except Exception as error:
context.exception = error
await self.hooks.trigger(HookType.ON_ERROR, context)
if not context.exception:
logger.info(f"✅ Recovery hook handled error for '{selected_command.description}'")
context.result = result
else:
return await self._handle_action_error(selected_command, error)
finally:
context.stop_timer()
await self.hooks.trigger(HookType.AFTER, context)
await self.hooks.trigger(HookType.ON_TEARDOWN, context)
return True
async def headless(self, command_key: str, return_context: bool = False) -> Any:
"""Runs the action of the selected command without displaying the menu."""
self.debug_hooks()
selected_command = self.get_command(command_key)
self.last_run_command = selected_command
if not selected_command:
logger.info("[Headless] Back command selected. Exiting menu.")
return
logger.info(f"[Headless] 🚀 Running: '{selected_command.description}'")
if not await self._should_run_action(selected_command):
raise FalyxError(f"[Headless] '{selected_command.description}' cancelled by confirmation.")
context = self._create_context(selected_command)
context.start_timer()
try:
await self.hooks.trigger(HookType.BEFORE, context)
if selected_command.spinner:
result = await self._run_action_with_spinner(selected_command)
else:
result = await selected_command()
context.result = result
await self.hooks.trigger(HookType.ON_SUCCESS, context)
logger.info(f"[Headless] ✅ '{selected_command.description}' complete.")
except (KeyboardInterrupt, EOFError):
raise FalyxError(f"[Headless] ⚠️ '{selected_command.description}' interrupted by user.")
except Exception as error:
context.exception = error
await self.hooks.trigger(HookType.ON_ERROR, context)
if not context.exception:
logger.info(f"[Headless] ✅ Recovery hook handled error for '{selected_command.description}'")
return True
raise FalyxError(f"[Headless] ❌ '{selected_command.description}' failed.") from error
finally:
context.stop_timer()
await self.hooks.trigger(HookType.AFTER, context)
await self.hooks.trigger(HookType.ON_TEARDOWN, context)
return context if return_context else context.result
def _set_retry_policy(self, selected_command: Command) -> None:
"""Sets the retry policy for the command based on CLI arguments."""
assert isinstance(self.cli_args, Namespace), "CLI arguments must be provided."
if self.cli_args.retries or self.cli_args.retry_delay or self.cli_args.retry_backoff:
selected_command.retry_policy.enabled = True
if self.cli_args.retries:
selected_command.retry_policy.max_retries = self.cli_args.retries
if self.cli_args.retry_delay:
selected_command.retry_policy.delay = self.cli_args.retry_delay
if self.cli_args.retry_backoff:
selected_command.retry_policy.backoff = self.cli_args.retry_backoff
selected_command.update_retry_policy(selected_command.retry_policy)
def get_arg_parser(self) -> ArgumentParser:
"""Returns the argument parser for the CLI."""
parser = ArgumentParser(prog="falyx", description="Falyx CLI - Run structured async command workflows.")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging for Falyx.")
parser.add_argument("--debug-hooks", action="store_true", help="Enable default lifecycle debug logging")
parser.add_argument("--version", action="store_true", help="Show Falyx version")
subparsers = parser.add_subparsers(dest="command")
run_parser = subparsers.add_parser("run", help="Run a specific command")
run_parser.add_argument("name", help="Key, alias, or description of the command")
run_parser.add_argument("--retries", type=int, help="Number of retries on failure", default=0)
run_parser.add_argument("--retry-delay", type=float, help="Initial delay between retries in (seconds)", default=0)
run_parser.add_argument("--retry-backoff", type=float, help="Backoff factor for retries", default=0)
run_group = run_parser.add_mutually_exclusive_group(required=False)
run_group.add_argument("-c", "--confirm", dest="force_confirm", action="store_true", help="Force confirmation prompts")
run_group.add_argument("-s", "--skip-confirm", dest="skip_confirm", action="store_true", help="Skip confirmation prompts")
run_all_parser = subparsers.add_parser("run-all", help="Run all commands with a given tag")
run_all_parser.add_argument("-t", "--tag", required=True, help="Tag to match")
run_all_parser.add_argument("--retries", type=int, help="Number of retries on failure", default=0)
run_all_parser.add_argument("--retry-delay", type=float, help="Initial delay between retries in (seconds)", default=0)
run_all_parser.add_argument("--retry-backoff", type=float, help="Backoff factor for retries", default=0)
run_all_group = run_all_parser.add_mutually_exclusive_group(required=False)
run_all_group.add_argument("-c", "--confirm", dest="force_confirm", action="store_true", help="Force confirmation prompts")
run_all_group.add_argument("-s", "--skip-confirm", dest="skip_confirm", action="store_true", help="Skip confirmation prompts")
preview_parser = subparsers.add_parser("preview", help="Preview a command without running it")
preview_parser.add_argument("name", help="Key, alias, or description of the command")
subparsers.add_parser("list", help="List all available commands with tags")
subparsers.add_parser("version", help="Show the Falyx version")
return parser
async def cli(self, parser: ArgumentParser | None = None) -> None:
"""Run Falyx CLI with structured subcommands."""
parser = parser or self.get_arg_parser()
self.cli_args = parser.parse_args()
if self.cli_args.verbose:
logging.getLogger("falyx").setLevel(logging.DEBUG)
if self.cli_args.debug_hooks:
logger.debug("✅ Enabling global debug hooks for all commands")
self.register_all_with_debug_hooks()
if self.cli_args.command == "list":
await self._show_help()
sys.exit(0)
if self.cli_args.command == "version" or self.cli_args.version:
self.console.print(f"[{OneColors.GREEN_b}]Falyx CLI v{__version__}[/]")
sys.exit(0)
if self.cli_args.command == "preview":
command = self.get_command(self.cli_args.name)
if not command:
self.console.print(f"[{OneColors.DARK_RED}]❌ Command '{self.cli_args.name}' not found.[/]")
sys.exit(1)
self.console.print(f"Preview of command '{command.key}': {command.description}")
await command.preview()
sys.exit(0)
if self.cli_args.command == "run":
command = self.get_command(self.cli_args.name)
if not command:
self.console.print(f"[{OneColors.DARK_RED}]❌ Command '{self.cli_args.name}' not found.[/]")
sys.exit(1)
self._set_retry_policy(command)
try:
result = await self.headless(self.cli_args.name)
except FalyxError as error:
self.console.print(f"[{OneColors.DARK_RED}]❌ Error: {error}[/]")
sys.exit(1)
self.console.print(f"[{OneColors.GREEN}]✅ Result:[/] {result}")
sys.exit(0)
if self.cli_args.command == "run-all":
matching = [
cmd for cmd in self.commands.values()
if self.cli_args.tag.lower() in (tag.lower() for tag in cmd.tags)
]
if not matching:
self.console.print(f"[{OneColors.LIGHT_YELLOW}]⚠️ No commands found with tag: '{self.cli_args.tag}'[/]")
sys.exit(1)
self.console.print(f"[bold cyan]🚀 Running all commands with tag:[/] {self.cli_args.tag}")
for cmd in matching:
self._set_retry_policy(cmd)
await self.headless(cmd.key)
sys.exit(0)
await self.menu()
async def menu(self) -> None:
"""Runs the menu and handles user input."""
logger.info(f"Running menu: {self.get_title()}")
self.debug_hooks()
if self.welcome_message:
self.console.print(self.welcome_message)
while True:
self.console.print(self.table)
try:
task = asyncio.create_task(self.process_command())
should_continue = await task
if not should_continue:
break
except (EOFError, KeyboardInterrupt):
logger.info(f"[{OneColors.DARK_RED}]EOF or KeyboardInterrupt. Exiting menu.")
break
logger.info(f"Exiting menu: {self.get_title()}")
if self.exit_message:
self.console.print(self.exit_message)

68
falyx/hook_manager.py Normal file
View File

@ -0,0 +1,68 @@
"""hook_manager.py"""
from __future__ import annotations
import inspect
from enum import Enum
from typing import Awaitable, Callable, Dict, List, Optional, Union
from falyx.context import ExecutionContext
from falyx.utils import logger
Hook = Union[
Callable[[ExecutionContext], None],
Callable[[ExecutionContext], Awaitable[None]]
]
class HookType(Enum):
"""Enum for hook types to categorize the hooks."""
BEFORE = "before"
ON_SUCCESS = "on_success"
ON_ERROR = "on_error"
AFTER = "after"
ON_TEARDOWN = "on_teardown"
@classmethod
def choices(cls) -> List[HookType]:
"""Return a list of all hook type choices."""
return list(cls)
def __str__(self) -> str:
"""Return the string representation of the hook type."""
return self.value
class HookManager:
def __init__(self) -> None:
self._hooks: Dict[HookType, List[Hook]] = {
hook_type: [] for hook_type in HookType
}
def register(self, hook_type: HookType, hook: Hook):
if hook_type not in HookType:
raise ValueError(f"Unsupported hook type: {hook_type}")
self._hooks[hook_type].append(hook)
def clear(self, hook_type: Optional[HookType] = None):
if hook_type:
self._hooks[hook_type] = []
else:
for ht in self._hooks:
self._hooks[ht] = []
async def trigger(self, hook_type: HookType, context: ExecutionContext):
if hook_type not in self._hooks:
raise ValueError(f"Unsupported hook type: {hook_type}")
for hook in self._hooks[hook_type]:
try:
if inspect.iscoroutinefunction(hook):
await hook(context)
else:
hook(context)
except Exception as hook_error:
logger.warning(f"⚠️ Hook '{hook.__name__}' raised an exception during '{hook_type}'"
f" for '{context.name}': {hook_error}")
if hook_type == HookType.ON_ERROR:
assert isinstance(context.exception, BaseException)
raise context.exception from hook_error

43
falyx/hooks.py Normal file
View File

@ -0,0 +1,43 @@
"""hooks.py"""
import time
from falyx.context import ExecutionContext
from falyx.exceptions import CircuitBreakerOpen
from falyx.utils import logger
class CircuitBreaker:
def __init__(self, max_failures=3, reset_timeout=10):
self.max_failures = max_failures
self.reset_timeout = reset_timeout
self.failures = 0
self.open_until = None
def before_hook(self, context: ExecutionContext):
name = context.name
if self.open_until:
if time.time() < self.open_until:
raise CircuitBreakerOpen(f"🔴 Circuit open for '{name}' until {time.ctime(self.open_until)}.")
else:
logger.info(f"🟢 Circuit closed again for '{name}'.")
self.failures = 0
self.open_until = None
def error_hook(self, context: ExecutionContext):
name = context.name
self.failures += 1
logger.warning(f"⚠️ CircuitBreaker: '{name}' failure {self.failures}/{self.max_failures}.")
if self.failures >= self.max_failures:
self.open_until = time.time() + self.reset_timeout
logger.error(f"🔴 Circuit opened for '{name}' until {time.ctime(self.open_until)}.")
def after_hook(self, context: ExecutionContext):
self.failures = 0
def is_open(self):
return self.open_until is not None and time.time() < self.open_until
def reset(self):
self.failures = 0
self.open_until = None
logger.info("🔄 Circuit reset.")

29
falyx/importer.py Normal file
View File

@ -0,0 +1,29 @@
"""importer.py"""
import importlib
from types import ModuleType
from typing import Any, Callable
def resolve_action(path: str) -> Callable[..., Any]:
"""
Resolve a dotted path to a Python callable.
Example: 'mypackage.mymodule.myfunction'
Raises:
ImportError if the module or function does not exist.
ValueError if the resolved attribute is not callable.
"""
if ":" in path:
module_path, function_name = path.split(":")
else:
*module_parts, function_name = path.split(".")
module_path = ".".join(module_parts)
module: ModuleType = importlib.import_module(module_path)
function: Any = getattr(module, function_name)
if not callable(function):
raise ValueError(f"Resolved attribute '{function_name}' is not callable.")
return function

87
falyx/main/main.py Normal file
View File

@ -0,0 +1,87 @@
import asyncio
import logging
from rich.markdown import Markdown
from falyx import Action, Falyx, HookType
from falyx.hooks import log_before, log_success, log_error, log_after
from falyx.themes.colors import OneColors
from falyx.utils import setup_logging
# Setup logging
setup_logging(console_log_level=logging.WARNING, json_log_to_file=True)
def main():
# Create the menu
menu = Falyx(
title=Markdown("# 🚀 Falyx CLI Demo"),
welcome_message="Welcome to Falyx!",
exit_message="Thanks for using Falyx!",
include_history_command=True,
include_help_command=True,
)
# Define async actions
async def hello():
print("👋 Hello from Falyx CLI!")
def goodbye():
print("👋 Goodbye from Falyx CLI!")
async def do_task_and_increment(counter_name: str = "tasks"):
await asyncio.sleep(3)
print("✅ Task completed.")
menu.bottom_bar.increment_total_counter(counter_name)
# Register global logging hooks
menu.hooks.register(HookType.BEFORE, log_before)
menu.hooks.register(HookType.ON_SUCCESS, log_success)
menu.hooks.register(HookType.ON_ERROR, log_error)
menu.hooks.register(HookType.AFTER, log_after)
# Add a toggle to the bottom bar
menu.add_toggle("D", "Debug Mode", state=False)
# Add a counter to the bottom bar
menu.add_total_counter("tasks", "Tasks", current=0, total=5)
# Add static text to the bottom bar
menu.add_static("env", "🌐 Local Env")
# Add commands with help_text
menu.add_command(
key="S",
description="Say Hello",
help_text="Greets the user with a friendly hello message.",
action=Action("Hello", hello),
color=OneColors.CYAN,
)
menu.add_command(
key="G",
description="Say Goodbye",
help_text="Bids farewell and thanks the user for using the app.",
action=Action("Goodbye", goodbye),
color=OneColors.MAGENTA,
)
menu.add_command(
key="T",
description="Run a Task",
aliases=["task", "run"],
help_text="Performs a task and increments the counter by 1.",
action=do_task_and_increment,
args=("tasks",),
color=OneColors.GREEN,
spinner=True,
)
asyncio.run(menu.cli())
if __name__ == "__main__":
"""
Entry point for the Falyx CLI demo application.
This function initializes the menu and runs it.
"""
main()

77
falyx/retry.py Normal file
View File

@ -0,0 +1,77 @@
"""retry.py"""
import asyncio
from pydantic import BaseModel, Field
from falyx.action import Action
from falyx.context import ExecutionContext
from falyx.utils import logger
class RetryPolicy(BaseModel):
max_retries: int = Field(default=3, ge=0)
delay: float = Field(default=1.0, ge=0.0)
backoff: float = Field(default=2.0, ge=1.0)
enabled: bool = False
def is_active(self) -> bool:
"""
Check if the retry policy is active.
:return: True if the retry policy is active, False otherwise.
"""
return self.max_retries > 0 and self.enabled
class RetryHandler:
def __init__(self, policy: RetryPolicy=RetryPolicy()):
self.policy = policy
def enable_policy(self, backoff=2, max_retries=3, delay=1):
self.policy.enabled = True
self.policy.max_retries = max_retries
self.policy.delay = delay
self.policy.backoff = backoff
logger.info(f"🔄 Retry policy enabled: {self.policy}")
async def retry_on_error(self, context: ExecutionContext):
name = context.name
error = context.exception
target = context.action
retries_done = 0
current_delay = self.policy.delay
last_error = error
if not target:
logger.warning(f"[{name}] ⚠️ No action target. Cannot retry.")
return
if not isinstance(target, Action):
logger.warning(f"[{name}] ❌ RetryHandler only supports only supports Action objects.")
return
if not getattr(target, "is_retryable", False):
logger.warning(f"[{name}] ❌ Not retryable.")
return
if not self.policy.enabled:
logger.warning(f"[{name}] ❌ Retry policy is disabled.")
return
while retries_done < self.policy.max_retries:
retries_done += 1
logger.info(f"[{name}] 🔄 Retrying ({retries_done}/{self.policy.max_retries}) in {current_delay}s due to '{last_error}'...")
await asyncio.sleep(current_delay)
try:
result = await target.action(*context.args, **context.kwargs)
context.result = result
context.exception = None
logger.info(f"[{name}] ✅ Retry succeeded on attempt {retries_done}.")
return
except Exception as retry_error:
last_error = retry_error
current_delay *= self.policy.backoff
logger.warning(f"[{name}] ⚠️ Retry attempt {retries_done}/{self.policy.max_retries} failed due to '{retry_error}'.")
context.exception = last_error
logger.error(f"[{name}] ❌ All {self.policy.max_retries} retries failed.")
return

513
falyx/themes/colors.py Normal file
View File

@ -0,0 +1,513 @@
"""
colors.py
A Python module that integrates the Nord color palette with the Rich library.
It defines a metaclass-based NordColors class allowing dynamic attribute lookups
(e.g., NORD12bu -> "#D08770 bold underline") and provides a comprehensive Nord-based
Theme that customizes Rich's default styles.
Features:
- All core Nord colors (NORD0 through NORD15), plus named aliases (Polar Night,
Snow Storm, Frost, Aurora).
- A dynamic metaclass (NordMeta) that enables usage of 'NORD1b', 'NORD1_biudrs', etc.
to return color + bold/italic/underline/dim/reverse/strike flags for Rich.
- A ready-to-use Theme (get_nord_theme) mapping Rich's default styles to Nord colors.
Example dynamic usage:
console.print("Hello!", style=NordColors.NORD12bu)
# => Renders "Hello!" in #D08770 (Nord12) plus bold and underline styles
"""
import re
from difflib import get_close_matches
from rich.console import Console
from rich.style import Style
from rich.theme import Theme
class ColorsMeta(type):
"""
A metaclass that catches attribute lookups like `NORD12buidrs` or `ORANGE_b` and returns
a string combining the base color + bold/italic/underline/dim/reverse/strike flags.
The color values are required to be uppercase with optional underscores and digits,
and style flags are required to be lowercase letters:
- 'b' for bold
- 'i' for italic
- 'u' for underline
- 'd' for dim
- 'r' for reverse
- 's' for strike
Example:
'NORD3bu' => '#4C566A bold underline'
If an invalid base color or style flag is used, a helpful AttributeError is raised.
"""
_STYLE_MAP = {
"b": "bold",
"i": "italic",
"u": "underline",
"d": "dim",
"r": "reverse",
"s": "strike",
}
_cache: dict = {}
def __getattr__(cls, name: str) -> str:
"""
Intercepts attributes like 'NORD12b' or 'POLAR_NIGHT_BRIGHT_biu'.
Splits into a valid base color attribute (e.g. 'POLAR_NIGHT_BRIGHT') and suffix
characters 'b', 'i', 'u', 'd', 'r', 's' which map to 'bold', 'italic', 'underline',
'dim', 'reverse', 'strike'.
Returns a string Rich can parse: e.g. '#3B4252 bold italic underline'.
Raises an informative AttributeError if invalid base or style flags are used.
"""
if name in cls._cache:
return cls._cache[name]
match = re.match(r"([A-Z]+(?:_[A-Z]+)*[0-9]*)(?:_)?([biudrs]*)", name)
if not match:
raise AttributeError(
f"'{cls.__name__}' has no attribute '{name}'.\n"
f"Expected format: BASE[_]?FLAGS, where BASE is uppercase letters/underscores/digits, "
f"and FLAGS ∈ {{'b', 'i', 'u', 'd', 'r', 's'}}."
)
base, suffix = match.groups()
try:
color_value = type.__getattribute__(cls, base)
except AttributeError:
error_msg = [f"'{cls.__name__}' has no color named '{base}'."]
valid_bases = [
key for key, val in cls.__dict__.items() if isinstance(val, str) and
not key.startswith("__")
]
suggestions = get_close_matches(base, valid_bases, n=1, cutoff=0.5)
if suggestions:
error_msg.append(f"Did you mean '{suggestions[0]}'?")
if valid_bases:
error_msg.append("Valid base color names include: " + ", ".join(valid_bases))
raise AttributeError(" ".join(error_msg)) from None
if not isinstance(color_value, str):
raise AttributeError(
f"'{cls.__name__}.{base}' is not a string color.\n"
f"Make sure that attribute actually contains a color string."
)
unique_flags = set(suffix)
styles = []
for letter in unique_flags:
mapped_style = cls._STYLE_MAP.get(letter)
if mapped_style:
styles.append(mapped_style)
else:
raise AttributeError(f"Unknown style flag '{letter}' in attribute '{name}'")
order = {"b": 1, "i": 2, "u": 3, "d": 4, "r": 5, "s": 6}
styles_sorted = sorted(styles, key=lambda s: order[s[0]])
if styles_sorted:
style_string = f"{color_value} {' '.join(styles_sorted)}"
else:
style_string = color_value
cls._cache[name] = style_string
return style_string
class OneColors(metaclass=ColorsMeta):
BLACK = "#282C34"
GUTTER_GREY = "#4B5263"
COMMENT_GREY = "#5C6370"
WHITE = "#ABB2BF"
DARK_RED = "#BE5046"
LIGHT_RED = "#E06C75"
DARK_YELLOW = "#D19A66"
LIGHT_YELLOW = "#E5C07B"
GREEN = "#98C379"
CYAN = "#56B6C2"
BLUE = "#61AFEF"
MAGENTA = "#C678DD"
@classmethod
def as_dict(cls):
"""
Returns a dictionary mapping every attribute
(e.g. 'NORD0') to its hex code.
"""
return {
attr: getattr(cls, attr)
for attr in dir(cls)
if not callable(getattr(cls, attr)) and
not attr.startswith("__")
}
class NordColors(metaclass=ColorsMeta):
"""
Defines the Nord color palette as class attributes.
Each color is labeled by its canonical Nord name (NORD0-NORD15)
and also has useful aliases grouped by theme:
- Polar Night
- Snow Storm
- Frost
- Aurora
"""
# Polar Night
NORD0 = "#2E3440"
NORD1 = "#3B4252"
NORD2 = "#434C5E"
NORD3 = "#4C566A"
# Snow Storm
NORD4 = "#D8DEE9"
NORD5 = "#E5E9F0"
NORD6 = "#ECEFF4"
# Frost
NORD7 = "#8FBCBB"
NORD8 = "#88C0D0"
NORD9 = "#81A1C1"
NORD10 = "#5E81AC"
# Aurora
NORD11 = "#BF616A"
NORD12 = "#D08770"
NORD13 = "#EBCB8B"
NORD14 = "#A3BE8C"
NORD15 = "#B48EAD"
POLAR_NIGHT_ORIGIN = NORD0
POLAR_NIGHT_BRIGHT = NORD1
POLAR_NIGHT_BRIGHTER = NORD2
POLAR_NIGHT_BRIGHTEST = NORD3
SNOW_STORM_BRIGHT = NORD4
SNOW_STORM_BRIGHTER = NORD5
SNOW_STORM_BRIGHTEST = NORD6
FROST_TEAL = NORD7
FROST_ICE = NORD8
FROST_SKY = NORD9
FROST_DEEP = NORD10
RED = NORD11
ORANGE = NORD12
YELLOW = NORD13
GREEN = NORD14
PURPLE = NORD15
MAGENTA = NORD15
BLUE = NORD10
CYAN = NORD8
@classmethod
def as_dict(cls):
"""
Returns a dictionary mapping every NORD* attribute
(e.g. 'NORD0') to its hex code.
"""
return {
attr: getattr(cls, attr)
for attr in dir(cls)
if attr.startswith("NORD") and
not callable(getattr(cls, attr))
}
@classmethod
def aliases(cls):
"""
Returns a dictionary of *all* other aliases
(Polar Night, Snow Storm, Frost, Aurora).
"""
skip_prefixes = ("NORD", "__")
alias_names = [
attr for attr in dir(cls)
if not any(attr.startswith(sp) for sp in skip_prefixes)
and not callable(getattr(cls, attr))
]
return {name: getattr(cls, name) for name in alias_names}
NORD_THEME_STYLES: dict[str, Style] = {
# ---------------------------------------------------------------
# Base / Structural styles
# ---------------------------------------------------------------
"none": Style.null(),
"reset": Style(
color="default",
bgcolor="default",
dim=False,
bold=False,
italic=False,
underline=False,
blink=False,
blink2=False,
reverse=False,
conceal=False,
strike=False,
),
"dim": Style(dim=True),
"bright": Style(dim=False),
"bold": Style(bold=True),
"strong": Style(bold=True),
"code": Style(reverse=True, bold=True),
"italic": Style(italic=True),
"emphasize": Style(italic=True),
"underline": Style(underline=True),
"blink": Style(blink=True),
"blink2": Style(blink2=True),
"reverse": Style(reverse=True),
"strike": Style(strike=True),
# ---------------------------------------------------------------
# Basic color names mapped to Nord
# ---------------------------------------------------------------
"black": Style(color=NordColors.POLAR_NIGHT_ORIGIN),
"red": Style(color=NordColors.RED),
"green": Style(color=NordColors.GREEN),
"yellow": Style(color=NordColors.YELLOW),
"magenta": Style(color=NordColors.MAGENTA),
"purple": Style(color=NordColors.PURPLE),
"cyan": Style(color=NordColors.CYAN),
"blue": Style(color=NordColors.BLUE),
"white": Style(color=NordColors.SNOW_STORM_BRIGHTEST),
# ---------------------------------------------------------------
# Inspect
# ---------------------------------------------------------------
"inspect.attr": Style(color=NordColors.YELLOW, italic=True),
"inspect.attr.dunder": Style(color=NordColors.YELLOW, italic=True, dim=True),
"inspect.callable": Style(bold=True, color=NordColors.RED),
"inspect.async_def": Style(italic=True, color=NordColors.FROST_ICE),
"inspect.def": Style(italic=True, color=NordColors.FROST_ICE),
"inspect.class": Style(italic=True, color=NordColors.FROST_ICE),
"inspect.error": Style(bold=True, color=NordColors.RED),
"inspect.equals": Style(),
"inspect.help": Style(color=NordColors.FROST_ICE),
"inspect.doc": Style(dim=True),
"inspect.value.border": Style(color=NordColors.GREEN),
# ---------------------------------------------------------------
# Live / Layout
# ---------------------------------------------------------------
"live.ellipsis": Style(bold=True, color=NordColors.RED),
"layout.tree.row": Style(dim=False, color=NordColors.RED),
"layout.tree.column": Style(dim=False, color=NordColors.FROST_DEEP),
# ---------------------------------------------------------------
# Logging
# ---------------------------------------------------------------
"logging.keyword": Style(bold=True, color=NordColors.YELLOW),
"logging.level.notset": Style(dim=True),
"logging.level.debug": Style(color=NordColors.GREEN),
"logging.level.info": Style(color=NordColors.FROST_ICE),
"logging.level.warning": Style(color=NordColors.RED),
"logging.level.error": Style(color=NordColors.RED, bold=True),
"logging.level.critical": Style(color=NordColors.RED, bold=True, reverse=True),
"log.level": Style.null(),
"log.time": Style(color=NordColors.FROST_ICE, dim=True),
"log.message": Style.null(),
"log.path": Style(dim=True),
# ---------------------------------------------------------------
# Python repr
# ---------------------------------------------------------------
"repr.ellipsis": Style(color=NordColors.YELLOW),
"repr.indent": Style(color=NordColors.GREEN, dim=True),
"repr.error": Style(color=NordColors.RED, bold=True),
"repr.str": Style(color=NordColors.GREEN, italic=False, bold=False),
"repr.brace": Style(bold=True),
"repr.comma": Style(bold=True),
"repr.ipv4": Style(bold=True, color=NordColors.GREEN),
"repr.ipv6": Style(bold=True, color=NordColors.GREEN),
"repr.eui48": Style(bold=True, color=NordColors.GREEN),
"repr.eui64": Style(bold=True, color=NordColors.GREEN),
"repr.tag_start": Style(bold=True),
"repr.tag_name": Style(color=NordColors.PURPLE, bold=True),
"repr.tag_contents": Style(color="default"),
"repr.tag_end": Style(bold=True),
"repr.attrib_name": Style(color=NordColors.YELLOW, italic=False),
"repr.attrib_equal": Style(bold=True),
"repr.attrib_value": Style(color=NordColors.PURPLE, italic=False),
"repr.number": Style(color=NordColors.FROST_ICE, bold=True, italic=False),
"repr.number_complex": Style(color=NordColors.FROST_ICE, bold=True, italic=False),
"repr.bool_true": Style(color=NordColors.GREEN, italic=True),
"repr.bool_false": Style(color=NordColors.RED, italic=True),
"repr.none": Style(color=NordColors.PURPLE, italic=True),
"repr.url": Style(underline=True, color=NordColors.FROST_ICE, italic=False, bold=False),
"repr.uuid": Style(color=NordColors.YELLOW, bold=False),
"repr.call": Style(color=NordColors.PURPLE, bold=True),
"repr.path": Style(color=NordColors.PURPLE),
"repr.filename": Style(color=NordColors.PURPLE),
# ---------------------------------------------------------------
# Rule
# ---------------------------------------------------------------
"rule.line": Style(color=NordColors.GREEN),
"rule.text": Style.null(),
# ---------------------------------------------------------------
# JSON
# ---------------------------------------------------------------
"json.brace": Style(bold=True),
"json.bool_true": Style(color=NordColors.GREEN, italic=True),
"json.bool_false": Style(color=NordColors.RED, italic=True),
"json.null": Style(color=NordColors.PURPLE, italic=True),
"json.number": Style(color=NordColors.FROST_ICE, bold=True, italic=False),
"json.str": Style(color=NordColors.GREEN, italic=False, bold=False),
"json.key": Style(color=NordColors.FROST_ICE, bold=True),
# ---------------------------------------------------------------
# Prompt
# ---------------------------------------------------------------
"prompt": Style.null(),
"prompt.choices": Style(color=NordColors.PURPLE, bold=True),
"prompt.default": Style(color=NordColors.FROST_ICE, bold=True),
"prompt.invalid": Style(color=NordColors.RED),
"prompt.invalid.choice": Style(color=NordColors.RED),
# ---------------------------------------------------------------
# Pretty
# ---------------------------------------------------------------
"pretty": Style.null(),
# ---------------------------------------------------------------
# Scope
# ---------------------------------------------------------------
"scope.border": Style(color=NordColors.FROST_ICE),
"scope.key": Style(color=NordColors.YELLOW, italic=True),
"scope.key.special": Style(color=NordColors.YELLOW, italic=True, dim=True),
"scope.equals": Style(color=NordColors.RED),
# ---------------------------------------------------------------
# Table
# ---------------------------------------------------------------
"table.header": Style(bold=True),
"table.footer": Style(bold=True),
"table.cell": Style.null(),
"table.title": Style(italic=True),
"table.caption": Style(italic=True, dim=True),
# ---------------------------------------------------------------
# Traceback
# ---------------------------------------------------------------
"traceback.error": Style(color=NordColors.RED, italic=True),
"traceback.border.syntax_error": Style(color=NordColors.RED),
"traceback.border": Style(color=NordColors.RED),
"traceback.text": Style.null(),
"traceback.title": Style(color=NordColors.RED, bold=True),
"traceback.exc_type": Style(color=NordColors.RED, bold=True),
"traceback.exc_value": Style.null(),
"traceback.offset": Style(color=NordColors.RED, bold=True),
# ---------------------------------------------------------------
# Progress bars
# ---------------------------------------------------------------
"bar.back": Style(color=NordColors.POLAR_NIGHT_BRIGHTEST),
"bar.complete": Style(color=NordColors.RED),
"bar.finished": Style(color=NordColors.GREEN),
"bar.pulse": Style(color=NordColors.RED),
"progress.description": Style.null(),
"progress.filesize": Style(color=NordColors.GREEN),
"progress.filesize.total": Style(color=NordColors.GREEN),
"progress.download": Style(color=NordColors.GREEN),
"progress.elapsed": Style(color=NordColors.YELLOW),
"progress.percentage": Style(color=NordColors.PURPLE),
"progress.remaining": Style(color=NordColors.FROST_ICE),
"progress.data.speed": Style(color=NordColors.RED),
"progress.spinner": Style(color=NordColors.GREEN),
"status.spinner": Style(color=NordColors.GREEN),
# ---------------------------------------------------------------
# Tree
# ---------------------------------------------------------------
"tree": Style(),
"tree.line": Style(),
# ---------------------------------------------------------------
# Markdown
# ---------------------------------------------------------------
"markdown.paragraph": Style(),
"markdown.text": Style(),
"markdown.em": Style(italic=True),
"markdown.emph": Style(italic=True), # For commonmark compatibility
"markdown.strong": Style(bold=True),
"markdown.code": Style(bold=True, color=NordColors.FROST_ICE, bgcolor=NordColors.POLAR_NIGHT_ORIGIN),
"markdown.code_block": Style(color=NordColors.FROST_ICE, bgcolor=NordColors.POLAR_NIGHT_ORIGIN),
"markdown.block_quote": Style(color=NordColors.PURPLE),
"markdown.list": Style(color=NordColors.FROST_ICE),
"markdown.item": Style(),
"markdown.item.bullet": Style(color=NordColors.YELLOW, bold=True),
"markdown.item.number": Style(color=NordColors.YELLOW, bold=True),
"markdown.hr": Style(color=NordColors.YELLOW),
"markdown.h1.border": Style(),
"markdown.h1": Style(bold=True),
"markdown.h2": Style(bold=True, underline=True),
"markdown.h3": Style(bold=True),
"markdown.h4": Style(bold=True, dim=True),
"markdown.h5": Style(underline=True),
"markdown.h6": Style(italic=True),
"markdown.h7": Style(italic=True, dim=True),
"markdown.link": Style(color=NordColors.FROST_ICE),
"markdown.link_url": Style(color=NordColors.FROST_SKY, underline=True),
"markdown.s": Style(strike=True),
# ---------------------------------------------------------------
# ISO8601
# ---------------------------------------------------------------
"iso8601.date": Style(color=NordColors.FROST_ICE),
"iso8601.time": Style(color=NordColors.PURPLE),
"iso8601.timezone": Style(color=NordColors.YELLOW),
}
def get_nord_theme() -> Theme:
"""
Returns a Rich Theme for the Nord color palette.
"""
return Theme(NORD_THEME_STYLES)
if __name__ == "__main__":
console = Console(theme=get_nord_theme(), color_system="truecolor")
# Basic demonstration of the Nord theme
console.print("Welcome to the [bold underline]Nord Themed[/] console!\n")
console.print("1) This is default text (no style).")
console.print("2) This is [red]red[/].")
console.print("3) This is [green]green[/].")
console.print("4) This is [blue]blue[/] (maps to Frost).")
console.print("5) And here's some [bold]Bold text[/] and [italic]italic text[/].\n")
console.log("Log example in info mode.")
console.log("Another log, with a custom style", style="logging.level.warning")
# Demonstrate the dynamic attribute usage
console.print(
"6) Demonstrating dynamic attribute [NORD3bu]: This text should be bold, underlined, "
"and use Nord3's color (#4C566A).",
style=NordColors.NORD3bu,
)
console.print()
# Show how the custom attribute can fail gracefully
try:
console.print("7) Attempting invalid suffix [NORD3z]:", style=NordColors.NORD3z)
except AttributeError as error:
console.print(f"Caught error: {error}", style="red")
# Demonstrate a traceback style:
console.print("\n8) Raising and displaying a traceback with Nord styling:\n", style="bold")
try:
raise ValueError("Nord test exception!")
except ValueError:
console.print_exception(show_locals=True)
console.print("\nEnd of Nord theme demo!", style="bold")

192
falyx/utils.py Normal file
View File

@ -0,0 +1,192 @@
"""utils.py"""
import functools
import inspect
import logging
import os
from itertools import islice
from typing import Any, Awaitable, Callable, TypeVar
import pythonjsonlogger.json
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import (AnyFormattedText, FormattedText,
merge_formatted_text)
from rich.logging import RichHandler
from falyx.themes.colors import OneColors
logger = logging.getLogger("falyx")
T = TypeVar("T")
async def _noop(*args, **kwargs):
pass
def is_coroutine(function: Callable[..., Any]) -> bool:
return inspect.iscoroutinefunction(function)
def ensure_async(function: Callable[..., T]) -> Callable[..., Awaitable[T]]:
if is_coroutine(function):
return function # type: ignore
@functools.wraps(function)
async def async_wrapper(*args, **kwargs) -> T:
return function(*args, **kwargs)
return async_wrapper
def chunks(iterator, size):
"""Yield successive n-sized chunks from an iterator."""
iterator = iter(iterator)
while True:
chunk = list(islice(iterator, size))
if not chunk:
break
yield chunk
async def async_confirm(message: AnyFormattedText = "Are you sure?") -> bool:
session: PromptSession = PromptSession()
while True:
merged_message: AnyFormattedText = merge_formatted_text([message, FormattedText([(OneColors.LIGHT_YELLOW_b, " [Y/n] ")])])
answer: str = (await session.prompt_async(merged_message)).strip().lower()
if answer in ("y", "yes"):
return True
if answer in ("n", "no", ""):
return False
print("Please enter y or n.")
class CaseInsensitiveDict(dict):
"""A case-insensitive dictionary that treats all keys as uppercase."""
def __setitem__(self, key, value):
super().__setitem__(key.upper(), value)
def __getitem__(self, key):
return super().__getitem__(key.upper())
def __contains__(self, key):
return super().__contains__(key.upper())
def get(self, key, default=None):
return super().get(key.upper(), default)
def pop(self, key, default=None):
return super().pop(key.upper(), default)
def update(self, other=None, **kwargs):
if other:
other = {k.upper(): v for k, v in other.items()}
kwargs = {k.upper(): v for k, v in kwargs.items()}
super().update(other, **kwargs)
def running_in_container() -> bool:
try:
with open("/proc/1/cgroup", "r", encoding="UTF-8") as f:
content = f.read()
return (
"docker" in content
or "kubepods" in content
or "containerd" in content
or "podman" in content
)
except Exception:
return False
def setup_logging(
mode: str | None = None,
log_filename: str = "falyx.log",
json_log_to_file: bool = False,
file_log_level: int = logging.DEBUG,
console_log_level: int = logging.WARNING,
):
"""
Configure logging for Falyx with support for both CLI-friendly and structured JSON output.
This function sets up separate logging handlers for console and file output, with optional
support for JSON formatting. It also auto-detects whether the application is running inside
a container to default to machine-readable logs when appropriate.
Args:
mode (str | None):
Logging output mode. Can be:
- "cli": human-readable Rich console logs (default outside containers)
- "json": machine-readable JSON logs (default inside containers)
If not provided, it will use the `FALYX_LOG_MODE` environment variable
or fallback based on container detection.
log_filename (str):
Path to the log file for file-based logging output. Defaults to "falyx.log".
json_log_to_file (bool):
Whether to format file logs as JSON (structured) instead of plain text.
Defaults to False.
file_log_level (int):
Logging level for file output. Defaults to `logging.DEBUG`.
console_log_level (int):
Logging level for console output. Defaults to `logging.WARNING`.
Behavior:
- Clears existing root handlers before setup.
- Configures console logging using either Rich (for CLI) or JSON formatting.
- Configures file logging in plain text or JSON based on `json_log_to_file`.
- Automatically sets logging levels for noisy third-party modules (`urllib3`, `asyncio`).
- Propagates logs from the "falyx" logger to ensure centralized output.
Raises:
ValueError: If an invalid logging `mode` is passed.
Environment Variables:
FALYX_LOG_MODE: Can override `mode` to enforce "cli" or "json" logging behavior.
"""
if not mode:
mode = os.getenv("FALYX_LOG_MODE") or (
"json" if running_in_container() else "cli"
)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
if root.hasHandlers():
root.handlers.clear()
if mode == "cli":
console_handler: RichHandler | logging.StreamHandler = RichHandler(
rich_tracebacks=True,
show_time=True,
show_level=True,
show_path=False,
markup=True,
log_time_format="[%Y-%m-%d %H:%M:%S]",
)
elif mode == "json":
console_handler = logging.StreamHandler()
console_handler.setFormatter(
pythonjsonlogger.json.JsonFormatter("%(asctime)s %(name)s %(levelname)s %(message)s")
)
else:
raise ValueError(f"Invalid log mode: {mode}")
console_handler.setLevel(console_log_level)
root.addHandler(console_handler)
file_handler = logging.FileHandler(log_filename)
file_handler.setLevel(file_log_level)
if json_log_to_file:
file_handler.setFormatter(
pythonjsonlogger.json.JsonFormatter("%(asctime)s %(name)s %(levelname)s %(message)s")
)
else:
file_handler.setFormatter(logging.Formatter(
"%(asctime)s [%(name)s] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
))
root.addHandler(file_handler)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("markdown_it").setLevel(logging.WARNING)
logger = logging.getLogger("falyx")
logger.propagate = True
logger.debug("Logging initialized in '%s' mode.", mode)

1
falyx/version.py Normal file
View File

@ -0,0 +1 @@
__version__ = "0.1.0"

430
pylintrc Normal file
View File

@ -0,0 +1,430 @@
# This Pylint rcfile contains a best-effort configuration to uphold the
# best-practices and style described in the Google Python style guide:
# https://google.github.io/styleguide/pyguide.html
#
# Its canonical open-source location is:
# https://google.github.io/styleguide/pylintrc
[MASTER]
# Files or directories to be skipped. They should be base names, not paths.
ignore=third_party
# Files or directories matching the regex patterns are skipped. The regex
# matches against base names, not paths.
ignore-patterns=
# Pickle collected data for later comparisons.
persistent=no
# List of plugins (as comma separated values of python modules names) to load,
# usually to register additional checkers.
load-plugins=
# Use multiple processes to speed up Pylint.
jobs=4
# Allow loading of arbitrary C extensions. Extensions are imported into the
# active Python interpreter and may run arbitrary code.
unsafe-load-any-extension=no
[MESSAGES CONTROL]
# Only show warnings with the listed confidence levels. Leave empty to show
# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED
confidence=
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once). See also the "--disable" option for examples.
#enable=
# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifiers separated by comma (,) or put this
# option multiple times (only on the command line, not in the configuration
# file where it should appear only once).You can also use "--disable=all" to
# disable everything first and then reenable specific checks. For example, if
# you want to run only the similarities checker, you can use "--disable=all
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
disable=abstract-method,
apply-builtin,
arguments-differ,
attribute-defined-outside-init,
backtick,
bad-option-value,
basestring-builtin,
buffer-builtin,
c-extension-no-member,
consider-using-enumerate,
cmp-builtin,
cmp-method,
coerce-builtin,
coerce-method,
delslice-method,
div-method,
duplicate-code,
eq-without-hash,
execfile-builtin,
file-builtin,
filter-builtin-not-iterating,
fixme,
getslice-method,
global-statement,
hex-method,
idiv-method,
implicit-str-concat,
import-error,
import-self,
import-star-module-level,
inconsistent-return-statements,
input-builtin,
intern-builtin,
invalid-str-codec,
locally-disabled,
long-builtin,
long-suffix,
map-builtin-not-iterating,
misplaced-comparison-constant,
missing-function-docstring,
metaclass-assignment,
next-method-called,
next-method-defined,
no-absolute-import,
no-else-break,
no-else-continue,
no-else-raise,
no-else-return,
no-init, # added
no-member,
no-name-in-module,
no-self-use,
nonzero-method,
oct-method,
old-division,
old-ne-operator,
old-octal-literal,
old-raise-syntax,
parameter-unpacking,
print-statement,
raising-string,
range-builtin-not-iterating,
raw_input-builtin,
rdiv-method,
reduce-builtin,
relative-import,
reload-builtin,
round-builtin,
setslice-method,
signature-differs,
standarderror-builtin,
suppressed-message,
sys-max-int,
too-few-public-methods,
too-many-ancestors,
too-many-arguments,
too-many-boolean-expressions,
too-many-branches,
too-many-instance-attributes,
too-many-locals,
too-many-nested-blocks,
too-many-public-methods,
too-many-return-statements,
too-many-statements,
trailing-newlines,
unichr-builtin,
unicode-builtin,
unnecessary-pass,
unpacking-in-except,
useless-else-on-loop,
useless-object-inheritance,
useless-suppression,
using-cmp-argument,
wrong-import-order,
xrange-builtin,
zip-builtin-not-iterating,
broad-exception-caught
[REPORTS]
# Set the output format. Available formats are text, parseable, colorized, msvs
# (visual studio) and html. You can also give a reporter class, eg
# mypackage.mymodule.MyReporterClass.
output-format=text
# Tells whether to display a full report or only the messages
reports=no
# Python expression which should return a note less than 10 (10 is the highest
# note). You have access to the variables errors warning, statement which
# respectively contain the number of errors / warnings messages and the total
# number of statements analyzed. This is used by the global evaluation report
# (RP0004).
evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
# Template used to display messages. This is a python new-style format string
# used to format the message information. See doc for all details
#msg-template=
[BASIC]
# Good variable names which should always be accepted, separated by a comma
good-names=main,_
# Bad variable names which should always be refused, separated by a comma
bad-names=
# Colon-delimited sets of names that determine each other's naming style when
# the name regexes allow several styles.
name-group=
# Include a hint for the correct naming format with invalid-name
include-naming-hint=no
# List of decorators that produce properties, such as abc.abstractproperty. Add
# to this list to register other decorators that produce valid properties.
property-classes=abc.abstractproperty,cached_property.cached_property,cached_property.threaded_cached_property,cached_property.cached_property_with_ttl,cached_property.threaded_cached_property_with_ttl
# Regular expression matching correct function names
function-rgx=^(?:(?P<exempt>setUp|tearDown|setUpModule|tearDownModule)|(?P<camel_case>_?[A-Z][a-zA-Z0-9]*)|(?P<snake_case>_?[a-z][a-z0-9_]*))$
# Regular expression matching correct variable names
variable-rgx=^[a-z][a-z0-9_]*$
# Regular expression matching correct constant names
const-rgx=^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$
# Regular expression matching correct attribute names
attr-rgx=^_{0,2}[a-z][a-z0-9_]*$
# Regular expression matching correct argument names
argument-rgx=^[a-z][a-z0-9_]*$
# Regular expression matching correct class attribute names
class-attribute-rgx=^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$
# Regular expression matching correct inline iteration names
inlinevar-rgx=^[a-z][a-z0-9_]*$
# Regular expression matching correct class names
class-rgx=^_?[A-Z][a-zA-Z0-9]*$
# Regular expression matching correct module names
module-rgx=^(_?[a-z][a-z0-9_]*|__init__)$
# Regular expression matching correct method names
method-rgx=(?x)^(?:(?P<exempt>_[a-z0-9_]+__|runTest|setUp|tearDown|setUpTestCase|tearDownTestCase|setupSelf|tearDownClass|setUpClass|(test|assert)_*[A-Z0-9][a-zA-Z0-9_]*|next)|(?P<camel_case>_{0,2}[A-Z][a-zA-Z0-9_]*)|(?P<snake_case>_{0,2}[a-z][a-z0-9_]*))$
# Regular expression which should only match function or class names that do
# not require a docstring.
no-docstring-rgx=(__.*__|main|test.*|.*test|.*Test)$
# Minimum line length for functions/classes that require docstrings, shorter
# ones are exempt.
docstring-min-length=10
[TYPECHECK]
# List of decorators that produce context managers, such as
# contextlib.contextmanager. Add to this list to register other decorators that
# produce valid context managers.
contextmanager-decorators=contextlib.contextmanager,contextlib2.contextmanager
# Tells whether missing members accessed in mixin class should be ignored. A
# mixin class is detected if its name ends with "mixin" (case insensitive).
ignore-mixin-members=yes
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis. It
# supports qualified module names, as well as Unix pattern matching.
ignored-modules=
# List of class names for which member attributes should not be checked (useful
# for classes with dynamically set attributes). This supports the use of
# qualified names.
ignored-classes=optparse.Values,thread._local,_thread._local
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E1101 when accessed. Python regular
# expressions are accepted.
generated-members=
[FORMAT]
# Maximum number of characters on a single line.
max-line-length=80
# TODO(https://github.com/PyCQA/pylint/issues/3352): Direct pylint to exempt
# lines made too long by directives to pytype.
# Regexp for a line that is allowed to be longer than the limit.
ignore-long-lines=(?x)(
^\s*(\#\ )?<?https?://\S+>?$|
^\s*(from\s+\S+\s+)?import\s+.+$)
# Allow the body of an if to be on the same line as the test if there is no
# else.
single-line-if-stmt=yes
# Maximum number of lines in a module
max-module-lines=99999
# String used as indentation unit. The internal Google style guide mandates 2
# spaces. Google's externaly-published style guide says 4, consistent with
# PEP 8. Here, we use 2 spaces, for conformity with many open-sourced Google
# projects (like TensorFlow).
indent-string=' '
# Number of spaces of indent required inside a hanging or continued line.
indent-after-paren=4
# Expected format of line ending, e.g. empty (any line ending), LF or CRLF.
expected-line-ending-format=
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
notes=TODO
[STRING]
# This flag controls whether inconsistent-quotes generates a warning when the
# character used as a quote delimiter is used inconsistently within a module.
check-quote-consistency=yes
[VARIABLES]
# Tells whether we should check for unused import in __init__ files.
init-import=no
# A regular expression matching the name of dummy variables (i.e. expectedly
# not used).
dummy-variables-rgx=^\*{0,2}(_$|unused_|dummy_)
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid to define new builtins when possible.
additional-builtins=
# List of strings which can identify a callback function by name. A callback
# name must start or end with one of those strings.
callbacks=cb_,_cb
# List of qualified module names which can have objects that can redefine
# builtins.
redefining-builtins-modules=six,six.moves,past.builtins,future.builtins,functools
[LOGGING]
# Logging modules to check that the string format arguments are in logging
# function parameter format
logging-modules=logging,absl.logging,tensorflow.io.logging
[SIMILARITIES]
# Minimum lines number of a similarity.
min-similarity-lines=4
# Ignore comments when computing similarities.
ignore-comments=yes
# Ignore docstrings when computing similarities.
ignore-docstrings=yes
# Ignore imports when computing similarities.
ignore-imports=no
[SPELLING]
# Spelling dictionary name. Available dictionaries: none. To make it working
# install python-enchant package.
spelling-dict=
# List of comma separated words that should not be checked.
spelling-ignore-words=
# A path to a file that contains private dictionary; one word per line.
spelling-private-dict-file=
# Tells whether to store unknown words to indicated private dictionary in
# --spelling-private-dict-file option instead of raising a message.
spelling-store-unknown-words=no
[IMPORTS]
# Deprecated modules which should not be used, separated by a comma
deprecated-modules=regsub,
TERMIOS,
Bastion,
rexec,
sets
# Create a graph of every (i.e. internal and external) dependencies in the
# given file (report RP0402 must not be disabled)
import-graph=
# Create a graph of external dependencies in the given file (report RP0402 must
# not be disabled)
ext-import-graph=
# Create a graph of internal dependencies in the given file (report RP0402 must
# not be disabled)
int-import-graph=
# Force import order to recognize a module as part of the standard
# compatibility libraries.
known-standard-library=
# Force import order to recognize a module as part of a third party library.
known-third-party=enchant, absl
# Analyse import fallback blocks. This can be used to support both Python 2 and
# 3 compatible code, which means that the block might have code that exists
# only in one or another interpreter, leading to false positives when analysed.
analyse-fallback-blocks=no
[CLASSES]
# List of method names used to declare (i.e. assign) instance attributes.
defining-attr-methods=__init__,
__new__,
setUp
# List of member names, which should be excluded from the protected access
# warning.
exclude-protected=_asdict,
_fields,
_replace,
_source,
_make
# List of valid names for the first argument in a class method.
valid-classmethod-first-arg=cls,
class_
# List of valid names for the first argument in a metaclass class method.
valid-metaclass-classmethod-first-arg=mcs
[EXCEPTIONS]
# Exceptions that will emit a warning when being caught. Defaults to
# "Exception"
overgeneral-exceptions=builtins.StandardError,
builtins.Exception,
builtins.BaseException

35
pyproject.toml Normal file
View File

@ -0,0 +1,35 @@
[tool.poetry]
name = "falyx"
version = "0.1.0"
description = "Reliable and introspectable async CLI action framework."
authors = ["Roland Thomas Jr <roland@rtj.dev>"]
license = "MIT"
readme = "README.md"
packages = [{ include = "falyx" }]
[tool.poetry.dependencies]
python = ">=3.10"
prompt_toolkit = "^3.0"
rich = "^13.0"
pydantic = "^2.0"
[tool.poetry.group.dev.dependencies]
pytest = "^7.0"
pytest-asyncio = "^0.20"
ruff = "^0.3"
[tool.poetry.scripts]
falyx = "falyx.cli.main:main"
sync-version = "scripts.sync_version:main"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
[tool.pylint."MESSAGES CONTROL"]
disable = ["broad-exception-caught"]

17
scripts/sync_version.py Normal file
View File

@ -0,0 +1,17 @@
"""scripts/sync_version.py"""
import toml
from pathlib import Path
def main():
pyproject_path = Path(__file__).parent.parent / "pyproject.toml"
version_path = Path(__file__).parent.parent / "falyx" / "version.py"
data = toml.load(pyproject_path)
version = data["tool"]["poetry"]["version"]
version_path.write_text(f'__version__ = "{version}"\n')
print(f"✅ Synced version: {version}{version_path}")
if __name__ == "__main__":
main()

18
setup.py Normal file
View File

@ -0,0 +1,18 @@
from setuptools import setup, find_packages
setup(
name="falyx",
version="0.0.1",
description="Reserved package name for future CLI framework.",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
author="Roland Thomas Jr",
author_email="roland@rtj.dev",
packages=find_packages(),
python_requires=">=3.10",
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Development Status :: 1 - Planning",
],
)

230
tests/test_actions.py Normal file
View File

@ -0,0 +1,230 @@
import pytest
import asyncio
import pickle
import warnings
from falyx.action import Action, ChainedAction, ActionGroup, ProcessAction
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.context import ExecutionContext, ResultsContext
asyncio_default_fixture_loop_scope = "function"
# --- Helpers ---
async def dummy_action(x: int = 0) -> int:
return x + 1
async def capturing_hook(context: ExecutionContext):
context.extra["hook_triggered"] = True
# --- Fixtures ---
@pytest.fixture
def sample_action():
return Action(name="increment", action=dummy_action, kwargs={"x": 5})
@pytest.fixture
def hook_manager():
hm = HookManager()
hm.register(HookType.BEFORE, capturing_hook)
return hm
@pytest.fixture(autouse=True)
def clean_registry():
er.clear()
yield
er.clear()
# --- Tests ---
@pytest.mark.asyncio
async def test_action_runs_correctly(sample_action):
result = await sample_action()
assert result == 6
@pytest.mark.asyncio
async def test_action_hook_lifecycle(hook_manager):
action = Action(
name="hooked",
action=lambda: 42,
hooks=hook_manager
)
await action()
context = er.get_latest()
assert context.name == "hooked"
assert context.extra.get("hook_triggered") is True
@pytest.mark.asyncio
async def test_chained_action_with_result_injection():
actions = [
Action(name="start", action=lambda: 1),
Action(name="add_last", action=lambda last_result: last_result + 5, inject_last_result=True),
Action(name="multiply", action=lambda last_result: last_result * 2, inject_last_result=True)
]
chain = ChainedAction(name="test_chain", actions=actions, inject_last_result=True)
result = await chain()
assert result == [1, 6, 12]
@pytest.mark.asyncio
async def test_action_group_runs_in_parallel():
actions = [
Action(name="a", action=lambda: 1),
Action(name="b", action=lambda: 2),
Action(name="c", action=lambda: 3),
]
group = ActionGroup(name="parallel", actions=actions)
result = await group()
result_dict = dict(result)
assert result_dict == {"a": 1, "b": 2, "c": 3}
@pytest.mark.asyncio
async def test_chained_action_inject_from_action():
inner_chain = ChainedAction(
name="inner_chain",
actions=[
Action(name="inner_first", action=lambda last_result: last_result + 10, inject_last_result=True),
Action(name="inner_second", action=lambda last_result: last_result + 5, inject_last_result=True),
]
)
actions = [
Action(name="first", action=lambda: 1),
Action(name="second", action=lambda last_result: last_result + 2, inject_last_result=True),
inner_chain,
]
outer_chain = ChainedAction(name="test_chain", actions=actions)
result = await outer_chain()
assert result == [1, 3, [13, 18]]
@pytest.mark.asyncio
async def test_chained_action_with_group():
group = ActionGroup(
name="group",
actions=[
Action(name="a", action=lambda last_result: last_result + 1, inject_last_result=True),
Action(name="b", action=lambda last_result: last_result + 2, inject_last_result=True),
Action(name="c", action=lambda: 3),
]
)
actions = [
Action(name="first", action=lambda: 1),
Action(name="second", action=lambda last_result: last_result + 2, inject_last_result=True),
group,
]
chain = ChainedAction(name="test_chain", actions=actions)
result = await chain()
assert result == [1, 3, [("a", 4), ("b", 5), ("c", 3)]]
@pytest.mark.asyncio
async def test_action_error_triggers_error_hook():
def fail():
raise ValueError("boom")
hooks = HookManager()
flag = {}
async def error_hook(ctx):
flag["called"] = True
hooks.register(HookType.ON_ERROR, error_hook)
action = Action(name="fail_action", action=fail, hooks=hooks)
with pytest.raises(ValueError):
await action()
assert flag.get("called") is True
@pytest.mark.asyncio
async def test_chained_action_rollback_on_failure():
rollback_called = []
async def success():
return "ok"
async def fail():
raise RuntimeError("fail")
async def rollback_fn():
rollback_called.append("rolled back")
actions = [
Action(name="ok", action=success, rollback=rollback_fn),
Action(name="fail", action=fail, rollback=rollback_fn)
]
chain = ChainedAction(name="chain", actions=actions)
with pytest.raises(RuntimeError):
await chain()
assert rollback_called == ["rolled back"]
def slow_add(x, y):
return x + y
@pytest.mark.asyncio
async def test_process_action_executes_correctly():
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
action = ProcessAction(name="proc", func=slow_add, args=(2, 3))
result = await action()
assert result == 5
unpickleable = lambda x: x + 1
@pytest.mark.asyncio
async def test_process_action_rejects_unpickleable():
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
action = ProcessAction(name="proc_fail", func=unpickleable, args=(2,))
with pytest.raises(pickle.PicklingError, match="Can't pickle"):
await action()
@pytest.mark.asyncio
async def test_register_hooks_recursively_propagates():
hook = lambda ctx: ctx.extra.update({"test_marker": True})
chain = ChainedAction(name="chain", actions=[
Action(name="a", action=lambda: 1),
Action(name="b", action=lambda: 2),
])
chain.register_hooks_recursively(HookType.BEFORE, hook)
await chain()
for ctx in er.get_by_name("a") + er.get_by_name("b"):
assert ctx.extra.get("test_marker") is True
@pytest.mark.asyncio
async def test_action_hook_recovers_error():
async def flaky():
raise ValueError("fail")
async def recovery_hook(ctx):
ctx.result = 99
ctx.exception = None
hooks = HookManager()
hooks.register(HookType.ON_ERROR, recovery_hook)
action = Action(name="recovering", action=flaky, hooks=hooks)
result = await action()
assert result == 99
@pytest.mark.asyncio
async def test_action_group_injects_last_result():
group = ActionGroup(name="group", actions=[
Action(name="g1", action=lambda last_result: last_result + 10, inject_last_result=True),
Action(name="g2", action=lambda last_result: last_result + 20, inject_last_result=True),
])
chain = ChainedAction(name="with_group", actions=[
Action(name="first", action=lambda: 5),
group,
])
result = await chain()
result_dict = dict(result[1])
assert result_dict == {"g1": 15, "g2": 25}