Solidify retry interface for Action, Command

This commit is contained in:
Roland Thomas Jr 2025-04-15 22:36:06 -04:00
parent 7a5ed20b33
commit b9db1cbb36
Signed by: roland
GPG Key ID: 7C3C2B085A4C2872
6 changed files with 170 additions and 90 deletions

104
README.md
View File

@ -1,6 +1,9 @@
# ⚔️ Falyx
![Python](https://img.shields.io/badge/Python-3.10+-blue)
![License](https://img.shields.io/badge/license-MIT-green)
![Async-Ready](https://img.shields.io/badge/asyncio-ready-purple)
**Falyx** is a resilient, introspectable CLI framework for building robust, asynchronous command-line workflows with:
**Falyx** is a battle-ready, introspectable CLI framework for building resilient, asynchronous workflows with:
- ✅ Modular action chaining and rollback
- 🔁 Built-in retry handling
@ -22,7 +25,7 @@ Modern CLI tools deserve the same resilience as production systems. Falyx makes
- Handle flaky operations with retries and exponential backoff
- Roll back safely on failure with structured undo logic
- Add observability with execution timing, result tracking, and hooks
- Run in both interactive or headless (scriptable) modes
- Run in both interactive *and* headless (scriptable) modes
- Customize output with Rich `Table`s (grouping, theming, etc.)
---
@ -46,33 +49,63 @@ poetry install
## ⚡ Quick Example
```python
from falyx import Action, ChainedAction, Menu
from falyx.hooks import RetryHandler, log_success
import asyncio
import random
from falyx import Falyx, Action, ChainedAction
# A flaky async step that fails randomly
async def flaky_step():
import random, asyncio
await asyncio.sleep(0.2)
if random.random() < 0.8:
if random.random() < 0.5:
raise RuntimeError("Random failure!")
return "ok"
retry = RetryHandler()
# Create the actions
step1 = Action(name="step_1", action=flaky_step, retry=True)
step2 = Action(name="step_2", action=flaky_step, retry=True)
step1 = Action("Step 1", flaky_step)
step1.hooks.register("on_error", retry.retry_on_error)
# Chain the actions
chain = ChainedAction(name="my_pipeline", actions=[step1, step2])
step2 = Action("Step 2", flaky_step)
step2.hooks.register("on_error", retry.retry_on_error)
chain = ChainedAction("My Pipeline", [step1, step2])
chain.hooks.register("on_success", log_success)
menu = Menu(title="🚀 Falyx Demo")
menu.add_command("R", "Run My Pipeline", chain)
# Create the CLI menu
falyx = Falyx("🚀 Falyx Demo")
falyx.add_command(
key="R",
description="Run My Pipeline",
action=chain,
logging_hooks=True,
# shows preview before confirmation
preview_before_confirm=True,
confirm=True,
)
# Entry point
if __name__ == "__main__":
import asyncio
asyncio.run(menu.run())
asyncio.run(falyx.run())
```
```bash
python simple.py
🚀 Falyx Demo
[R] Run My Pipeline
[Y] History [Q] Exit
>
```
```bash
python simple.py run R
Command: 'R' — Run My Pipeline
└── ⛓ ChainedAction 'my_pipeline'
├── ⚙ Action 'step_1'
│ ↻ Retries: 3x, delay 1.0s, backoff 2.0x
└── ⚙ Action 'step_2'
↻ Retries: 3x, delay 1.0s, backoff 2.0x
Confirm execution of R — Run My Pipeline (calls `my_pipeline`) [Y/n] y
[2025-04-15 22:03:57] WARNING ⚠️ Retry attempt 1/3 failed due to 'Random failure!'.
✅ Result: ['ok', 'ok']
```
---
@ -101,17 +134,28 @@ if __name__ == "__main__":
---
## 🧩 Components
### 🧱 Core Building Blocks
| Component | Purpose |
|------------------|--------------------------------------------------------|
| `Action` | Single async task with hook + result injection support |
| `ChainedAction` | Sequential task runner with rollback |
| `ActionGroup` | Parallel runner for independent tasks |
| `ProcessAction` | CPU-bound task in a separate process (multiprocessing) |
| `Menu` | CLI runner with toggleable prompt or headless mode |
| `ExecutionContext`| Captures metadata per execution |
| `HookManager` | Lifecycle hook registration engine |
#### `Action`
A single async unit of work. Can retry, roll back, or inject prior results.
#### `ChainedAction`
Run tasks in sequence. Supports rollback on failure and context propagation.
#### `ActionGroup`
Run tasks in parallel. Useful for fan-out operations like batch API calls.
#### `ProcessAction`
Offload CPU-bound work to another process — no extra code needed.
#### `Falyx`
Your CLI controller — powers menus, subcommands, history, bottom bars, and more.
#### `ExecutionContext`
Tracks metadata, arguments, timing, and results for each action execution.
#### `HookManager`
Registers and triggers lifecycle hooks (`before`, `after`, `on_error`, etc.) for actions and commands.
---
@ -125,7 +169,6 @@ Falyx is designed for developers who dont just want CLI tools to run — they
## 🛣️ Roadmap
- [ ] Retry policy DSL (e.g., `max_retries=3, backoff="exponential"`)
- [ ] Metrics export (Prometheus-style)
- [ ] Plugin system for menu extensions
- [ ] Native support for structured logs + log forwarding
@ -142,4 +185,3 @@ MIT — use it, fork it, improve it. Attribution appreciated!
## 🌐 falyx.dev — **reliable actions, resilient flows**
---

View File

@ -23,7 +23,8 @@ from rich.tree import Tree
from falyx.context import ExecutionContext, ResultsContext
from falyx.debug import register_debug_hooks
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.hook_manager import Hook, HookManager, HookType
from falyx.retry import RetryHandler, RetryPolicy
from falyx.themes.colors import OneColors
from falyx.utils import ensure_async, logger
@ -107,6 +108,19 @@ class BaseAction(ABC):
def __repr__(self):
return str(self)
@classmethod
def enable_retries_recursively(cls, action: BaseAction, policy: RetryPolicy | None):
if not policy:
policy = RetryPolicy(enabled=True)
if isinstance(action, Action):
action.retry_policy = policy
action.retry_policy.enabled = True
action.hooks.register("on_error", RetryHandler(policy).retry_on_error)
if hasattr(action, "actions"):
for sub in action.actions:
cls.enable_retries_recursively(sub, policy)
class Action(BaseAction):
"""A simple action that runs a callable. It can be a function or a coroutine."""
@ -120,6 +134,8 @@ class Action(BaseAction):
hooks: HookManager | None = None,
inject_last_result: bool = False,
inject_last_result_as: str = "last_result",
retry: bool = False,
retry_policy: RetryPolicy | None = None,
) -> None:
super().__init__(name, hooks, inject_last_result, inject_last_result_as)
self.action = ensure_async(action)
@ -127,6 +143,21 @@ class Action(BaseAction):
self.args = args
self.kwargs = kwargs or {}
self.is_retryable = True
self.retry_policy = retry_policy or RetryPolicy()
if retry or (retry_policy and retry_policy.enabled):
self.enable_retry()
def enable_retry(self):
"""Enable retry with the existing retry policy."""
self.retry_policy.enabled = True
logger.debug(f"[Action:{self.name}] Registering retry handler")
handler = RetryHandler(self.retry_policy)
self.hooks.register(HookType.ON_ERROR, handler.retry_on_error)
def set_retry_policy(self, policy: RetryPolicy):
"""Set a new retry policy and re-register the handler."""
self.retry_policy = policy
self.enable_retry()
async def _run(self, *args, **kwargs) -> Any:
combined_args = args + self.args
@ -159,13 +190,19 @@ class Action(BaseAction):
er.record(context)
async def preview(self, parent: Tree | None = None):
label = f"[{OneColors.GREEN_b}]⚙ Action[/] '{self.name}'"
label = [f"[{OneColors.GREEN_b}]⚙ Action[/] '{self.name}'"]
if self.inject_last_result:
label += f" [dim](injects '{self.inject_last_result_as}')[/dim]"
label.append(f" [dim](injects '{self.inject_last_result_as}')[/dim]")
if self.retry_policy.enabled:
label.append(
f"\n[dim]↻ Retries:[/] {self.retry_policy.max_retries}x, "
f"delay {self.retry_policy.delay}s, backoff {self.retry_policy.backoff}x"
)
if parent:
parent.add(label)
parent.add("".join(label))
else:
console.print(Tree(label))
console.print(Tree("".join(label)))
class ActionListMixin:
@ -267,8 +304,8 @@ class ChainedAction(BaseAction, ActionListMixin):
async def preview(self, parent: Tree | None = None):
label = f"[{OneColors.CYAN_b}]⛓ ChainedAction[/] '{self.name}'"
if self.inject_last_result:
label += f" [dim](injects '{self.inject_last_result_as}')[/dim]"
tree = parent.add(label) if parent else Tree(label)
label.append(f" [dim](injects '{self.inject_last_result_as}')[/dim]")
tree = parent.add("".join(label)) if parent else Tree("".join(label))
for action in self.actions:
await action.preview(parent=tree)
if not parent:
@ -345,10 +382,10 @@ class ActionGroup(BaseAction, ActionListMixin):
er.record(context)
async def preview(self, parent: Tree | None = None):
label = f"[{OneColors.MAGENTA_b}]⏩ ActionGroup (parallel)[/] '{self.name}'"
label = [f"[{OneColors.MAGENTA_b}]⏩ ActionGroup (parallel)[/] '{self.name}'"]
if self.inject_last_result:
label += f" [dim](receives '{self.inject_last_result_as}')[/dim]"
tree = parent.add(label) if parent else Tree(label)
label.append(f" [dim](receives '{self.inject_last_result_as}')[/dim]")
tree = parent.add("".join(label)) if parent else Tree("".join(label))
actions = self.actions.copy()
random.shuffle(actions)
await asyncio.gather(*(action.preview(parent=tree) for action in actions))
@ -422,13 +459,13 @@ class ProcessAction(BaseAction):
er.record(context)
async def preview(self, parent: Tree | None = None):
label = f"[{OneColors.DARK_YELLOW_b}]🧠 ProcessAction (new process)[/] '{self.name}'"
label = [f"[{OneColors.DARK_YELLOW_b}]🧠 ProcessAction (new process)[/] '{self.name}'"]
if self.inject_last_result:
label += f" [dim](injects '{self.inject_last_result_as}')[/dim]"
label.append(f" [dim](injects '{self.inject_last_result_as}')[/dim]")
if parent:
parent.add(label)
parent.add("".join(label))
else:
console.print(Tree(label))
console.print(Tree("".join(label)))
def _validate_pickleable(self, obj: Any) -> bool:
try:

View File

@ -16,12 +16,12 @@ from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator
from rich.console import Console
from rich.tree import Tree
from falyx.action import BaseAction
from falyx.action import Action, BaseAction
from falyx.context import ExecutionContext
from falyx.debug import register_debug_hooks
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.retry import RetryHandler, RetryPolicy
from falyx.retry import RetryPolicy
from falyx.themes.colors import OneColors
from falyx.utils import _noop, ensure_async, logger
@ -47,6 +47,8 @@ class Command(BaseModel):
spinner_style: str = OneColors.CYAN
spinner_kwargs: dict[str, Any] = Field(default_factory=dict)
hooks: "HookManager" = Field(default_factory=HookManager)
retry: bool = False
retry_all: bool = False
retry_policy: RetryPolicy = Field(default_factory=RetryPolicy)
tags: list[str] = Field(default_factory=list)
logging_hooks: bool = False
@ -57,26 +59,18 @@ class Command(BaseModel):
def model_post_init(self, __context: Any) -> None:
"""Post-initialization to set up the action and hooks."""
self._auto_register_retry_hook()
if self.retry and isinstance(self.action, Action):
self.action.enable_retry()
elif self.retry_policy and isinstance(self.action, Action):
self.action.set_retry_policy(self.retry_policy)
elif self.retry:
logger.warning(f"[Command:{self.key}] Retry requested, but action is not an Action instance.")
if self.retry_all:
self.action.enable_retries_recursively(self.action, self.retry_policy)
if self.logging_hooks and isinstance(self.action, BaseAction):
register_debug_hooks(self.action.hooks)
def _auto_register_retry_hook(self):
if (
self.retry_policy.is_active() and
isinstance(self.action, BaseAction) and
self.action.is_retryable
):
logger.debug(f"Auto-registering retry handler for action: {self.key}")
retry_handler = RetryHandler(self.retry_policy)
self.action.hooks.register(HookType.ON_ERROR, retry_handler.retry_on_error)
elif self.retry_policy.is_active():
logger.debug(f"Retry policy is active but action is not retryable: {self.key}")
def update_retry_policy(self, policy: RetryPolicy):
self.retry_policy = policy
self._auto_register_retry_hook()
@field_validator("action", mode="before")
@classmethod
def wrap_callable_as_async(cls, action: Any) -> Any:

View File

@ -34,11 +34,11 @@ from falyx.action import BaseAction
from falyx.bottom_bar import BottomBar
from falyx.command import Command
from falyx.context import ExecutionContext
from falyx.debug import register_debug_hooks, log_before, log_success, log_error, log_after
from falyx.debug import log_after, log_before, log_error, log_success
from falyx.exceptions import (CommandAlreadyExistsError, FalyxError,
InvalidActionError, NotAFalyxError)
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType, Hook
from falyx.hook_manager import Hook, HookManager, HookType
from falyx.retry import RetryPolicy
from falyx.themes.colors import OneColors, get_nord_theme
from falyx.utils import CaseInsensitiveDict, async_confirm, chunks, logger
@ -462,6 +462,9 @@ class Falyx:
error_hooks: list[Callable] | None = None,
teardown_hooks: list[Callable] | None = None,
tags: list[str] | None = None,
logging_hooks: bool = False,
retry: bool = False,
retry_all: bool = False,
retry_policy: RetryPolicy | None = None,
) -> Command:
"""Adds an command to the menu, preventing duplicates."""
@ -484,7 +487,10 @@ class Falyx:
spinner_style=spinner_style,
spinner_kwargs=spinner_kwargs or {},
tags=tags if tags else [],
retry_policy=retry_policy if retry_policy else RetryPolicy(),
logging_hooks=logging_hooks,
retry=retry,
retry_all=retry_all,
retry_policy=retry_policy or RetryPolicy(),
)
if hooks:
@ -749,8 +755,27 @@ class Falyx:
return parser
async def menu(self) -> None:
"""Runs the menu and handles user input."""
logger.info(f"Running menu: {self.get_title()}")
self.debug_hooks()
if self.welcome_message:
self.console.print(self.welcome_message)
while True:
self.console.print(self.table)
try:
task = asyncio.create_task(self.process_command())
should_continue = await task
if not should_continue:
break
except (EOFError, KeyboardInterrupt):
logger.info(f"[{OneColors.DARK_RED}]EOF or KeyboardInterrupt. Exiting menu.")
break
logger.info(f"Exiting menu: {self.get_title()}")
if self.exit_message:
self.console.print(self.exit_message)
async def cli(self, parser: ArgumentParser | None = None) -> None:
async def run(self, parser: ArgumentParser | None = None) -> None:
"""Run Falyx CLI with structured subcommands."""
parser = parser or self.get_arg_parser()
self.cli_args = parser.parse_args()
@ -809,23 +834,3 @@ class Falyx:
sys.exit(0)
await self.menu()
async def menu(self) -> None:
"""Runs the menu and handles user input."""
logger.info(f"Running menu: {self.get_title()}")
self.debug_hooks()
if self.welcome_message:
self.console.print(self.welcome_message)
while True:
self.console.print(self.table)
try:
task = asyncio.create_task(self.process_command())
should_continue = await task
if not should_continue:
break
except (EOFError, KeyboardInterrupt):
logger.info(f"[{OneColors.DARK_RED}]EOF or KeyboardInterrupt. Exiting menu.")
break
logger.info(f"Exiting menu: {self.get_title()}")
if self.exit_message:
self.console.print(self.exit_message)

View File

@ -41,3 +41,4 @@ class CircuitBreaker:
self.failures = 0
self.open_until = None
logger.info("🔄 Circuit reset.")

View File

@ -1,8 +1,8 @@
"""retry.py"""
import asyncio
from pydantic import BaseModel, Field
from falyx.action import Action
from falyx.context import ExecutionContext
from falyx.utils import logger
@ -33,6 +33,7 @@ class RetryHandler:
logger.info(f"🔄 Retry policy enabled: {self.policy}")
async def retry_on_error(self, context: ExecutionContext):
from falyx.action import Action
name = context.name
error = context.exception
target = context.action