io-actions #1

Merged
roland merged 3 commits from io-actions into main 2025-04-30 22:26:27 -04:00
32 changed files with 1535 additions and 284 deletions
Showing only changes of commit 80de941335 - Show all commits

View File

@ -1,3 +1,9 @@
"""
Falyx CLI Framework
Copyright (c) 2025 rtj.dev LLC.
Licensed under the MIT License. See LICENSE file for details.
"""
import logging
from .action import Action, ActionGroup, ChainedAction, ProcessAction

View File

@ -1,42 +1,77 @@
# falyx/__main__.py
"""
Falyx CLI Framework
Copyright (c) 2025 rtj.dev LLC.
Licensed under the MIT License. See LICENSE file for details.
"""
import asyncio
import logging
from falyx.action import Action
from falyx.action import Action, ActionGroup, ChainedAction
from falyx.falyx import Falyx
def build_falyx() -> Falyx:
"""Build and return a Falyx instance with all your commands."""
app = Falyx(title="🚀 Falyx CLI")
flx = Falyx(title="🚀 Falyx CLI")
# Example commands
app.add_command(
flx.add_command(
key="B",
description="Build project",
action=Action("Build", lambda: print("📦 Building...")),
tags=["build"]
)
app.add_command(
flx.add_command(
key="T",
description="Run tests",
action=Action("Test", lambda: print("🧪 Running tests...")),
tags=["test"]
)
app.add_command(
flx.add_command(
key="D",
description="Deploy project",
action=Action("Deploy", lambda: print("🚀 Deploying...")),
tags=["deploy"]
)
return app
# Example of ChainedAction (pipeline)
build_pipeline = ChainedAction(
name="Full Build Pipeline",
actions=[
Action("Clean", lambda: print("🧹 Cleaning...")),
Action("Build", lambda: print("🔨 Building...")),
Action("Package", lambda: print("📦 Packaging...")),
],
auto_inject=False,
)
flx.add_command(
key="P",
description="Run Build Pipeline",
action=build_pipeline,
tags=["build", "pipeline"]
)
# Example of ActionGroup (parallel tasks)
test_suite = ActionGroup(
name="Test Suite",
actions=[
Action("Unit Tests", lambda: print("🧪 Running unit tests...")),
Action("Integration Tests", lambda: print("🔗 Running integration tests...")),
Action("Lint", lambda: print("🧹 Running linter...")),
]
)
flx.add_command(
key="G",
description="Run All Tests",
action=test_suite,
tags=["test", "parallel"]
)
return flx
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING)
falyx = build_falyx()
asyncio.run(falyx.run())
flx = build_falyx()
asyncio.run(flx.run())

View File

@ -1,12 +1,30 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""action.py
Any Action or Command is callable and supports the signature:
result = thing(*args, **kwargs)
Core action system for Falyx.
This guarantees:
- Hook lifecycle (before/after/error/teardown)
- Timing
- Consistent return values
This module defines the building blocks for executable actions and workflows,
providing a structured way to compose, execute, recover, and manage sequences of operations.
All actions are callable and follow a unified signature:
result = action(*args, **kwargs)
Core guarantees:
- Full hook lifecycle support (before, on_success, on_error, after, on_teardown).
- Consistent timing and execution context tracking for each run.
- Unified, predictable result handling and error propagation.
- Optional last_result injection to enable flexible, data-driven workflows.
- Built-in support for retries, rollbacks, parallel groups, chaining, and fallback recovery.
Key components:
- Action: wraps a function or coroutine into a standard executable unit.
- ChainedAction: runs actions sequentially, optionally injecting last results.
- ActionGroup: runs actions in parallel and gathers results.
- ProcessAction: executes CPU-bound functions in a separate process.
- LiteralInputAction: injects static values into workflows.
- FallbackAction: gracefully recovers from failures or missing data.
This design promotes clean, fault-tolerant, modular CLI and automation systems.
"""
from __future__ import annotations
@ -14,7 +32,7 @@ import asyncio
import random
from abc import ABC, abstractmethod
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from functools import cached_property, partial
from typing import Any, Callable
from rich.console import Console
@ -22,6 +40,7 @@ from rich.tree import Tree
from falyx.context import ExecutionContext, ResultsContext
from falyx.debug import register_debug_hooks
from falyx.exceptions import EmptyChainError
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import Hook, HookManager, HookType
from falyx.retry import RetryHandler, RetryPolicy
@ -35,11 +54,12 @@ class BaseAction(ABC):
"""
Base class for actions. Actions can be simple functions or more
complex actions like `ChainedAction` or `ActionGroup`. They can also
be run independently or as part of Menu.
be run independently or as part of Falyx.
inject_last_result (bool): Whether to inject the previous action's result into kwargs.
inject_last_result_as (str): The name of the kwarg key to inject the result as
(default: 'last_result').
_requires_injection (bool): Whether the action requires input injection.
"""
def __init__(
self,
@ -55,7 +75,8 @@ class BaseAction(ABC):
self.results_context: ResultsContext | None = None
self.inject_last_result: bool = inject_last_result
self.inject_last_result_as: str = inject_last_result_as
self.requires_injection: bool = False
self._requires_injection: bool = False
self._skip_in_chain: bool = False
if logging_hooks:
register_debug_hooks(self.hooks)
@ -122,7 +143,7 @@ class BaseAction(ABC):
def requires_io_injection(self) -> bool:
"""Checks to see if the action requires input injection."""
return self.requires_injection
return self._requires_injection
def __str__(self):
return f"<{self.__class__.__name__} '{self.name}'>"
@ -132,7 +153,27 @@ class BaseAction(ABC):
class Action(BaseAction):
"""A simple action that runs a callable. It can be a function or a coroutine."""
"""
Action wraps a simple function or coroutine into a standard executable unit.
It supports:
- Optional retry logic.
- Hook lifecycle (before, success, error, after, teardown).
- Last result injection for chaining.
- Optional rollback handlers for undo logic.
Args:
name (str): Name of the action.
action (Callable): The function or coroutine to execute.
rollback (Callable, optional): Rollback function to undo the action.
args (tuple, optional): Static positional arguments.
kwargs (dict, optional): Static keyword arguments.
hooks (HookManager, optional): Hook manager for lifecycle events.
inject_last_result (bool, optional): Enable last_result injection.
inject_last_result_as (str, optional): Name of injected key.
retry (bool, optional): Whether to enable retries.
retry_policy (RetryPolicy, optional): Retry settings.
"""
def __init__(
self,
name: str,
@ -147,7 +188,7 @@ class Action(BaseAction):
retry_policy: RetryPolicy | None = None,
) -> None:
super().__init__(name, hooks, inject_last_result, inject_last_result_as)
self.action = ensure_async(action)
self.action = action
self.rollback = rollback
self.args = args
self.kwargs = kwargs or {}
@ -156,9 +197,17 @@ class Action(BaseAction):
if retry or (retry_policy and retry_policy.enabled):
self.enable_retry()
@property
def action(self) -> Callable[..., Any]:
return self._action
@action.setter
def action(self, value: Callable[..., Any]):
self._action = ensure_async(value)
def enable_retry(self):
"""Enable retry with the existing retry policy."""
self.retry_policy.enabled = True
self.retry_policy.enable_policy()
logger.debug(f"[Action:{self.name}] Registering retry handler")
handler = RetryHandler(self.retry_policy)
self.hooks.register(HookType.ON_ERROR, handler.retry_on_error)
@ -166,6 +215,7 @@ class Action(BaseAction):
def set_retry_policy(self, policy: RetryPolicy):
"""Set a new retry policy and re-register the handler."""
self.retry_policy = policy
if policy.enabled:
self.enable_retry()
async def _run(self, *args, **kwargs) -> Any:
@ -213,12 +263,64 @@ class Action(BaseAction):
else:
console.print(Tree("".join(label)))
def __str__(self):
return f"Action(name={self.name}, action={self.action.__name__})"
class LiteralInputAction(Action):
"""
LiteralInputAction injects a static value into a ChainedAction.
This allows embedding hardcoded values mid-pipeline, useful when:
- Providing default or fallback inputs.
- Starting a pipeline with a fixed input.
- Supplying missing context manually.
Args:
value (Any): The static value to inject.
"""
def __init__(self, value: Any):
self._value = value
async def literal(*args, **kwargs):
return value
super().__init__("Input", literal, inject_last_result=True)
super().__init__("Input", literal)
@cached_property
def value(self) -> Any:
"""Return the literal value."""
return self._value
def __str__(self) -> str:
return f"LiteralInputAction(value={self.value})"
class FallbackAction(Action):
"""
FallbackAction provides a default value if the previous action failed or returned None.
It injects the last result and checks:
- If last_result is not None, it passes it through unchanged.
- If last_result is None (e.g., due to failure), it replaces it with a fallback value.
Used in ChainedAction pipelines to gracefully recover from errors or missing data.
When activated, it consumes the preceding error and allows the chain to continue normally.
Args:
fallback (Any): The fallback value to use if last_result is None.
"""
def __init__(self, fallback: Any):
self._fallback = fallback
async def _fallback_logic(last_result):
return last_result if last_result is not None else fallback
super().__init__(name="Fallback", action=_fallback_logic, inject_last_result=True)
@cached_property
def fallback(self) -> Any:
"""Return the fallback value."""
return self._fallback
def __str__(self) -> str:
return f"FallbackAction(fallback={self.fallback})"
class ActionListMixin:
@ -253,7 +355,26 @@ class ActionListMixin:
class ChainedAction(BaseAction, ActionListMixin):
"""A ChainedAction is a sequence of actions that are executed in order."""
"""
ChainedAction executes a sequence of actions one after another.
Features:
- Supports optional automatic last_result injection (auto_inject).
- Recovers from intermediate errors using FallbackAction if present.
- Rolls back all previously executed actions if a failure occurs.
- Handles literal values with LiteralInputAction.
Best used for defining robust, ordered workflows where each step can depend on previous results.
Args:
name (str): Name of the chain.
actions (list): List of actions or literals to execute.
hooks (HookManager, optional): Hooks for lifecycle events.
inject_last_result (bool, optional): Whether to inject last results into kwargs by default.
inject_last_result_as (str, optional): Key name for injection.
auto_inject (bool, optional): Auto-enable injection for subsequent actions.
return_list (bool, optional): Whether to return a list of all results. False returns the last result.
"""
def __init__(
self,
name: str,
@ -262,28 +383,28 @@ class ChainedAction(BaseAction, ActionListMixin):
inject_last_result: bool = False,
inject_last_result_as: str = "last_result",
auto_inject: bool = False,
return_list: bool = False,
) -> None:
super().__init__(name, hooks, inject_last_result, inject_last_result_as)
ActionListMixin.__init__(self)
self.auto_inject = auto_inject
self.return_list = return_list
if actions:
self.set_actions(actions)
def _wrap_literal_if_needed(self, action: BaseAction | Any) -> BaseAction:
return LiteralInputAction(action) if not isinstance(action, BaseAction) else action
def _apply_auto_inject(self, action: BaseAction) -> None:
if self.auto_inject and not action.inject_last_result:
action.inject_last_result = True
def set_actions(self, actions: list[BaseAction | Any]):
self.actions.clear()
for action in actions:
def add_action(self, action: BaseAction | Any) -> None:
action = self._wrap_literal_if_needed(action)
self._apply_auto_inject(action)
self.add_action(action)
if self.actions and self.auto_inject and not action.inject_last_result:
action.inject_last_result = True
super().add_action(action)
async def _run(self, *args, **kwargs) -> list[Any]:
if not self.actions:
raise EmptyChainError(f"[{self.name}] No actions to execute.")
results_context = ResultsContext(name=self.name)
if self.results_context:
results_context.add_result(self.results_context.last_result())
@ -300,18 +421,35 @@ class ChainedAction(BaseAction, ActionListMixin):
await self.hooks.trigger(HookType.BEFORE, context)
for index, action in enumerate(self.actions):
if action._skip_in_chain:
logger.debug("[%s] ⚠️ Skipping consumed action '%s'", self.name, action.name)
continue
results_context.current_index = index
prepared = action.prepare_for_chain(results_context)
last_result = results_context.last_result()
try:
if self.requires_io_injection() and last_result is not None:
result = await prepared(**{prepared.inject_last_result_as: last_result})
else:
result = await prepared(*args, **updated_kwargs)
except Exception as error:
if index + 1 < len(self.actions) and isinstance(self.actions[index + 1], FallbackAction):
logger.warning("[%s] ⚠️ Fallback triggered: %s, recovering with fallback '%s'.",
self.name, error, self.actions[index + 1].name)
results_context.add_result(None)
context.extra["results"].append(None)
fallback = self.actions[index + 1].prepare_for_chain(results_context)
result = await fallback()
fallback._skip_in_chain = True
else:
raise
results_context.add_result(result)
context.extra["results"].append(result)
context.extra["rollback_stack"].append(prepared)
context.result = context.extra["results"]
all_results = context.extra["results"]
assert all_results, f"[{self.name}] No results captured. Something seriously went wrong."
context.result = all_results if self.return_list else all_results[-1]
await self.hooks.trigger(HookType.ON_SUCCESS, context)
return context.result
@ -328,6 +466,18 @@ class ChainedAction(BaseAction, ActionListMixin):
er.record(context)
async def _rollback(self, rollback_stack, *args, **kwargs):
"""
Roll back all executed actions in reverse order.
Rollbacks run even if a fallback recovered from failure,
ensuring consistent undo of all side effects.
Actions without rollback handlers are skipped.
Args:
rollback_stack (list): Actions to roll back.
*args, **kwargs: Passed to rollback handlers.
"""
for action in reversed(rollback_stack):
rollback = getattr(action, "rollback", None)
if rollback:
@ -355,7 +505,37 @@ class ChainedAction(BaseAction, ActionListMixin):
class ActionGroup(BaseAction, ActionListMixin):
"""An ActionGroup is a collection of actions that can be run in parallel."""
"""
ActionGroup executes multiple actions concurrently in parallel.
It is ideal for independent tasks that can be safely run simultaneously,
improving overall throughput and responsiveness of workflows.
Core features:
- Parallel execution of all contained actions.
- Shared last_result injection across all actions if configured.
- Aggregated collection of individual results as (name, result) pairs.
- Hook lifecycle support (before, on_success, on_error, after, on_teardown).
- Error aggregation: captures all action errors and reports them together.
Behavior:
- If any action fails, the group collects the errors but continues executing
other actions without interruption.
- After all actions complete, ActionGroup raises a single exception summarizing
all failures, or returns all results if successful.
Best used for:
- Batch processing multiple independent tasks.
- Reducing latency for workflows with parallelizable steps.
- Isolating errors while maximizing successful execution.
Args:
name (str): Name of the chain.
actions (list): List of actions or literals to execute.
hooks (HookManager, optional): Hooks for lifecycle events.
inject_last_result (bool, optional): Whether to inject last results into kwargs by default.
inject_last_result_as (str, optional): Key name for injection.
"""
def __init__(
self,
name: str,
@ -436,7 +616,25 @@ class ActionGroup(BaseAction, ActionListMixin):
class ProcessAction(BaseAction):
"""A ProcessAction runs a function in a separate process using ProcessPoolExecutor."""
"""
ProcessAction runs a function in a separate process using ProcessPoolExecutor.
Features:
- Executes CPU-bound or blocking tasks without blocking the main event loop.
- Supports last_result injection into the subprocess.
- Validates that last_result is pickleable when injection is enabled.
Args:
name (str): Name of the action.
func (Callable): Function to execute in a new process.
args (tuple, optional): Positional arguments.
kwargs (dict, optional): Keyword arguments.
hooks (HookManager, optional): Hook manager for lifecycle events.
executor (ProcessPoolExecutor, optional): Custom executor if desired.
inject_last_result (bool, optional): Inject last result into the function.
inject_last_result_as (str, optional): Name of the injected key.
"""
def __init__(
self,
name: str,

View File

@ -1,3 +1,4 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""bottom_bar.py"""
from typing import Any, Callable
@ -8,7 +9,7 @@ from rich.console import Console
from falyx.options_manager import OptionsManager
from falyx.themes.colors import OneColors
from falyx.utils import CaseInsensitiveDict
from falyx.utils import CaseInsensitiveDict, chunks
class BottomBar:
@ -211,5 +212,8 @@ class BottomBar:
def render(self):
"""Render the bottom bar."""
return merge_formatted_text([fn() for fn in self._named_items.values()])
lines = []
for chunk in chunks(self._named_items.values(), self.columns):
lines.extend([fn for fn in chunk])
lines.append(lambda: HTML("\n"))
return merge_formatted_text([fn() for fn in lines[:-1]])

View File

@ -1,14 +1,24 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""command.py
Any Action or Command is callable and supports the signature:
result = thing(*args, **kwargs)
This guarantees:
- Hook lifecycle (before/after/error/teardown)
- Timing
- Consistent return values
Defines the Command class for Falyx CLI.
Commands are callable units representing a menu option or CLI task,
wrapping either a BaseAction or a simple function. They provide:
- Hook lifecycle (before, on_success, on_error, after, on_teardown)
- Execution timing and duration tracking
- Retry logic (single action or recursively through action trees)
- Confirmation prompts and spinner integration
- Result capturing and summary logging
- Rich-based preview for CLI display
Every Command is self-contained, configurable, and plays a critical role
in building robust interactive menus.
"""
from __future__ import annotations
from functools import cached_property
from typing import Any, Callable
from prompt_toolkit.formatted_text import FormattedText
@ -16,11 +26,12 @@ from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator
from rich.console import Console
from rich.tree import Tree
from falyx.action import Action, BaseAction
from falyx.action import Action, ActionGroup, BaseAction, ChainedAction
from falyx.context import ExecutionContext
from falyx.debug import register_debug_hooks
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.io_action import BaseIOAction
from falyx.retry import RetryPolicy
from falyx.themes.colors import OneColors
from falyx.utils import _noop, ensure_async, logger
@ -29,13 +40,63 @@ console = Console()
class Command(BaseModel):
"""Class representing an command in the menu."""
"""
Represents a selectable command in a Falyx menu system.
A Command wraps an executable action (function, coroutine, or BaseAction)
and enhances it with:
- Lifecycle hooks (before, success, error, after, teardown)
- Retry support (single action or recursive for chained/grouped actions)
- Confirmation prompts for safe execution
- Spinner visuals during execution
- Tagging for categorization and filtering
- Rich-based CLI previews
- Result tracking and summary reporting
Commands are built to be flexible yet robust, enabling dynamic CLI workflows
without sacrificing control or reliability.
Attributes:
key (str): Primary trigger key for the command.
description (str): Short description for the menu display.
hidden (bool): Toggles visibility in the menu.
aliases (list[str]): Alternate keys or phrases.
action (BaseAction | Callable): The executable logic.
args (tuple): Static positional arguments.
kwargs (dict): Static keyword arguments.
help_text (str): Additional help or guidance text.
color (str): Color theme for CLI rendering.
confirm (bool): Whether to require confirmation before executing.
confirm_message (str): Custom confirmation prompt.
preview_before_confirm (bool): Whether to preview before confirming.
spinner (bool): Whether to show a spinner during execution.
spinner_message (str): Spinner text message.
spinner_type (str): Spinner style (e.g., dots, line, etc.).
spinner_style (str): Color or style of the spinner.
spinner_kwargs (dict): Extra spinner configuration.
hooks (HookManager): Hook manager for lifecycle events.
retry (bool): Enable retry on failure.
retry_all (bool): Enable retry across chained or grouped actions.
retry_policy (RetryPolicy): Retry behavior configuration.
tags (list[str]): Organizational tags for the command.
logging_hooks (bool): Whether to attach logging hooks automatically.
requires_input (bool | None): Indicates if the action needs input.
Methods:
__call__(): Executes the command, respecting hooks and retries.
preview(): Rich tree preview of the command.
confirmation_prompt(): Formatted prompt for confirmation.
result: Property exposing the last result.
log_summary(): Summarizes execution details to the console.
"""
key: str
description: str
aliases: list[str] = Field(default_factory=list)
action: BaseAction | Callable[[], Any] = _noop
args: tuple = ()
kwargs: dict[str, Any] = Field(default_factory=dict)
hidden: bool = False
aliases: list[str] = Field(default_factory=list)
help_text: str = ""
color: str = OneColors.WHITE
confirm: bool = False
@ -52,6 +113,7 @@ class Command(BaseModel):
retry_policy: RetryPolicy = Field(default_factory=RetryPolicy)
tags: list[str] = Field(default_factory=list)
logging_hooks: bool = False
requires_input: bool | None = None
_context: ExecutionContext | None = PrivateAttr(default=None)
@ -65,12 +127,32 @@ class Command(BaseModel):
self.action.set_retry_policy(self.retry_policy)
elif self.retry:
logger.warning(f"[Command:{self.key}] Retry requested, but action is not an Action instance.")
if self.retry_all:
if self.retry_all and isinstance(self.action, BaseAction):
self.retry_policy.enabled = True
self.action.enable_retries_recursively(self.action, self.retry_policy)
elif self.retry_all:
logger.warning(f"[Command:{self.key}] Retry all requested, but action is not a BaseAction instance.")
if self.logging_hooks and isinstance(self.action, BaseAction):
register_debug_hooks(self.action.hooks)
if self.requires_input is None and self.detect_requires_input:
self.requires_input = True
self.hidden = True
elif self.requires_input is None:
self.requires_input = False
@cached_property
def detect_requires_input(self) -> bool:
"""Detect if the action requires input based on its type."""
if isinstance(self.action, BaseIOAction):
return True
elif isinstance(self.action, ChainedAction):
return isinstance(self.action.actions[0], BaseIOAction) if self.action.actions else False
elif isinstance(self.action, ActionGroup):
return any(isinstance(action, BaseIOAction) for action in self.action.actions)
return False
@field_validator("action", mode="before")
@classmethod
def wrap_callable_as_async(cls, action: Any) -> Any:
@ -81,7 +163,8 @@ class Command(BaseModel):
raise TypeError("Action must be a callable or an instance of BaseAction")
def __str__(self):
return f"Command(key='{self.key}', description='{self.description}')"
return (f"Command(key='{self.key}', description='{self.description}' "
f"action='{self.action}')")
async def __call__(self, *args, **kwargs):
"""Run the action with full hook lifecycle, timing, and error handling."""

View File

@ -1,3 +1,4 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""config.py
Configuration loader for Falyx CLI commands."""
@ -79,11 +80,12 @@ def loader(file_path: str) -> list[dict[str, Any]]:
command_dict = {
"key": entry["key"],
"description": entry["description"],
"aliases": entry.get("aliases", []),
"action": wrap_if_needed(import_action(entry["action"]),
name=entry["description"]),
"args": tuple(entry.get("args", ())),
"kwargs": entry.get("kwargs", {}),
"hidden": entry.get("hidden", False),
"aliases": entry.get("aliases", []),
"help_text": entry.get("help_text", ""),
"color": entry.get("color", "white"),
"confirm": entry.get("confirm", False),
@ -94,10 +96,18 @@ def loader(file_path: str) -> list[dict[str, Any]]:
"spinner_type": entry.get("spinner_type", "dots"),
"spinner_style": entry.get("spinner_style", "cyan"),
"spinner_kwargs": entry.get("spinner_kwargs", {}),
"tags": entry.get("tags", []),
"before_hooks": entry.get("before_hooks", []),
"success_hooks": entry.get("success_hooks", []),
"error_hooks": entry.get("error_hooks", []),
"after_hooks": entry.get("after_hooks", []),
"teardown_hooks": entry.get("teardown_hooks", []),
"retry": entry.get("retry", False),
"retry_all": entry.get("retry_all", False),
"retry_policy": RetryPolicy(**entry.get("retry_policy", {})),
"tags": entry.get("tags", []),
"logging_hooks": entry.get("logging_hooks", False),
"requires_input": entry.get("requires_input", None),
}
commands.append(command_dict)
return commands

View File

@ -1,3 +1,4 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""context.py"""
import time
from datetime import datetime

View File

@ -1,3 +1,4 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
from falyx.context import ExecutionContext
from falyx.hook_manager import HookManager, HookType
from falyx.utils import logger

View File

@ -1,3 +1,4 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
class FalyxError(Exception):
"""Custom exception for the Menu class."""
@ -20,3 +21,7 @@ class NotAFalyxError(FalyxError):
class CircuitBreakerOpen(FalyxError):
"""Exception raised when the circuit breaker is open."""
class EmptyChainError(FalyxError):
"""Exception raised when the chain is empty."""

View File

@ -1,3 +1,4 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""execution_registry.py"""
from collections import defaultdict
from datetime import datetime
@ -62,6 +63,8 @@ class ExecutionRegistry:
else:
status = "[green]✅ Success"
result = repr(ctx.result)
if len(result) > 1000:
result = f"{result[:1000]}..."
table.add_row(ctx.name, start, end, duration, status, result)

View File

@ -1,16 +1,23 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""falyx.py
This class creates a Falyx object that creates a selectable menu
with customizable commands and functionality.
Main class for constructing and running Falyx CLI menus.
It allows for adding commands, and their accompanying actions,
and provides a method to display the menu and handle user input.
Falyx provides a structured, customizable interactive menu system
for running commands, actions, and workflows. It supports:
This class uses the `rich` library to display the menu in a
formatted and visually appealing way.
- Hook lifecycle management (before/on_success/on_error/after/on_teardown)
- Dynamic command addition and alias resolution
- Rich-based menu display with multi-column layouts
- Interactive input validation and auto-completion
- History tracking and help menu generation
- Confirmation prompts and spinners
- Headless mode for automated script execution
- CLI argument parsing with argparse integration
- Retry policy configuration for actions
This class also uses the `prompt_toolkit` library to handle
user input and create an interactive experience.
Falyx enables building flexible, robust, and user-friendly
terminal applications with minimal boilerplate.
"""
import asyncio
import logging
@ -30,7 +37,7 @@ from rich.console import Console
from rich.markdown import Markdown
from rich.table import Table
from falyx.action import BaseAction
from falyx.action import Action, BaseAction
from falyx.bottom_bar import BottomBar
from falyx.command import Command
from falyx.context import ExecutionContext
@ -43,37 +50,57 @@ from falyx.options_manager import OptionsManager
from falyx.parsers import get_arg_parsers
from falyx.retry import RetryPolicy
from falyx.themes.colors import OneColors, get_nord_theme
from falyx.utils import CaseInsensitiveDict, async_confirm, chunks, logger
from falyx.utils import (CaseInsensitiveDict, async_confirm, chunks,
get_program_invocation, logger)
from falyx.version import __version__
class Falyx:
"""Class to create a menu with commands.
Hook functions must have the signature:
def hook(command: Command) -> None:
where `command` is the selected command.
Error hook functions must have the signature:
def error_hook(command: Command, error: Exception) -> None:
where `command` is the selected command and `error` is the exception raised.
Hook execution order:
1. Before action hooks of the menu.
2. Before action hooks of the selected command.
3. Action of the selected command.
4. After action hooks of the selected command.
5. After action hooks of the menu.
6. On error hooks of the selected command (if an error occurs).
7. On error hooks of the menu (if an error occurs).
Parameters:
title (str|Markdown): The title of the menu.
columns (int): The number of columns to display the commands in.
prompt (AnyFormattedText): The prompt to display when asking for input.
bottom_bar (str|callable|None): The text to display in the bottom bar.
"""
Main menu controller for Falyx CLI applications.
Falyx orchestrates the full lifecycle of an interactive menu system,
handling user input, command execution, error recovery, and structured
CLI workflows.
Key Features:
- Interactive menu with Rich rendering and Prompt Toolkit input handling
- Dynamic command management with alias and abbreviation matching
- Full lifecycle hooks (before, success, error, after, teardown) at both menu and command levels
- Built-in retry support, spinner visuals, and confirmation prompts
- Submenu nesting and action chaining
- History tracking, help generation, and headless execution modes
- Seamless CLI argument parsing and integration via argparse
- Extensible with user-defined hooks, bottom bars, and custom layouts
Args:
title (str | Markdown): Title displayed for the menu.
prompt (AnyFormattedText): Prompt displayed when requesting user input.
columns (int): Number of columns to use when rendering menu commands.
bottom_bar (BottomBar | str | Callable | None): Bottom toolbar content or logic.
welcome_message (str | Markdown | dict): Welcome message shown at startup.
exit_message (str | Markdown | dict): Exit message shown on shutdown.
key_bindings (KeyBindings | None): Custom Prompt Toolkit key bindings.
include_history_command (bool): Whether to add a built-in history viewer command.
include_help_command (bool): Whether to add a built-in help viewer command.
confirm_on_error (bool): Whether to prompt the user after errors.
never_confirm (bool): Whether to skip confirmation prompts entirely.
always_confirm (bool): Whether to force confirmation prompts for all actions.
cli_args (Namespace | None): Parsed CLI arguments, usually from argparse.
options (OptionsManager | None): Declarative option mappings.
custom_table (Callable[[Falyx], Table] | Table | None): Custom menu table generator.
Methods:
run(): Main entry point for CLI argument-based workflows. Most users will use this.
menu(): Run the interactive menu loop.
headless(command_key, return_context): Run a command directly without showing the menu.
add_command(): Add a single command to the menu.
add_commands(): Add multiple commands at once.
register_all_hooks(): Register hooks across all commands and submenus.
debug_hooks(): Log hook registration for debugging.
build_default_table(): Construct the standard Rich table layout.
"""
def __init__(
self,
title: str | Markdown = "Menu",
@ -84,7 +111,7 @@ class Falyx:
exit_message: str | Markdown | dict[str, Any] = "",
key_bindings: KeyBindings | None = None,
include_history_command: bool = True,
include_help_command: bool = False,
include_help_command: bool = True,
confirm_on_error: bool = True,
never_confirm: bool = False,
always_confirm: bool = False,
@ -207,6 +234,8 @@ class Falyx:
for command in self.commands.values():
help_text = command.help_text or command.description
if command.requires_input:
help_text += " [dim](requires input)[/dim]"
table.add_row(
f"[{command.color}]{command.key}[/]",
", ".join(command.aliases) if command.aliases else "None",
@ -234,7 +263,7 @@ class Falyx:
"Show this help menu"
)
self.console.print(table)
self.console.print(table, justify="center")
def _get_help_command(self) -> Command:
"""Returns the help command for the menu."""
@ -324,13 +353,12 @@ class Falyx:
def bottom_bar(self, bottom_bar: BottomBar | str | Callable[[], Any] | None) -> None:
"""Sets the bottom bar for the menu."""
if bottom_bar is None:
self._bottom_bar = BottomBar(self.columns, self.key_bindings, key_validator=self.is_key_available)
self._bottom_bar: BottomBar | str | Callable[[], Any] = BottomBar(self.columns, self.key_bindings, key_validator=self.is_key_available)
elif isinstance(bottom_bar, BottomBar):
bottom_bar.key_validator = self.is_key_available
bottom_bar.key_bindings = self.key_bindings
self._bottom_bar = bottom_bar
elif (isinstance(bottom_bar, str) or
callable(bottom_bar)):
elif (isinstance(bottom_bar, str) or callable(bottom_bar)):
self._bottom_bar = bottom_bar
else:
raise FalyxError("Bottom bar must be a string, callable, or BottomBar instance.")
@ -339,12 +367,12 @@ class Falyx:
def _get_bottom_bar_render(self) -> Callable[[], Any] | str | None:
"""Returns the bottom bar for the menu."""
if isinstance(self.bottom_bar, BottomBar) and self.bottom_bar._named_items:
return self._bottom_bar.render
elif callable(self._bottom_bar):
return self._bottom_bar
elif isinstance(self._bottom_bar, str):
return self._bottom_bar
elif self._bottom_bar is None:
return self.bottom_bar.render
elif callable(self.bottom_bar):
return self.bottom_bar
elif isinstance(self.bottom_bar, str):
return self.bottom_bar
elif self.bottom_bar is None:
return None
return None
@ -475,9 +503,10 @@ class Falyx:
key: str,
description: str,
action: BaseAction | Callable[[], Any],
aliases: list[str] | None = None,
args: tuple = (),
kwargs: dict[str, Any] = {},
hidden: bool = False,
aliases: list[str] | None = None,
help_text: str = "",
color: str = OneColors.WHITE,
confirm: bool = False,
@ -491,25 +520,27 @@ class Falyx:
hooks: HookManager | None = None,
before_hooks: list[Callable] | None = None,
success_hooks: list[Callable] | None = None,
after_hooks: list[Callable] | None = None,
error_hooks: list[Callable] | None = None,
after_hooks: list[Callable] | None = None,
teardown_hooks: list[Callable] | None = None,
tags: list[str] | None = None,
logging_hooks: bool = False,
retry: bool = False,
retry_all: bool = False,
retry_policy: RetryPolicy | None = None,
requires_input: bool | None = None,
) -> Command:
"""Adds an command to the menu, preventing duplicates."""
self._validate_command_key(key)
command = Command(
key=key,
description=description,
aliases=aliases if aliases else [],
help_text=help_text,
action=action,
args=args,
kwargs=kwargs,
hidden=hidden,
aliases=aliases if aliases else [],
help_text=help_text,
color=color,
confirm=confirm,
confirm_message=confirm_message,
@ -524,6 +555,7 @@ class Falyx:
retry=retry,
retry_all=retry_all,
retry_policy=retry_policy or RetryPolicy(),
requires_input=requires_input,
)
if hooks:
@ -558,13 +590,15 @@ class Falyx:
def build_default_table(self) -> Table:
"""Build the standard table layout. Developers can subclass or call this in custom tables."""
table = Table(title=self.title, show_header=False, box=box.SIMPLE, expand=True)
for chunk in chunks(self.commands.items(), self.columns):
visible_commands = [item for item in self.commands.items() if not item[1].hidden]
for chunk in chunks(visible_commands, self.columns):
row = []
for key, command in chunk:
row.append(f"[{key}] [{command.color}]{command.description}")
table.add_row(*row)
bottom_row = self.get_bottom_row()
table.add_row(*bottom_row)
for row in chunks(bottom_row, self.columns):
table.add_row(*row)
return table
@property
@ -617,9 +651,9 @@ class Falyx:
confirm_answer = await async_confirm(selected_command.confirmation_prompt)
if confirm_answer:
logger.info(f"[{OneColors.LIGHT_YELLOW}][{selected_command.description}]🔐 confirmed.")
logger.info(f"[{selected_command.description}]🔐 confirmed.")
else:
logger.info(f"[{OneColors.DARK_RED}][{selected_command.description}]❌ cancelled.")
logger.info(f"[{selected_command.description}]❌ cancelled.")
return confirm_answer
return True
@ -658,8 +692,19 @@ class Falyx:
choice = await self.session.prompt_async()
selected_command = self.get_command(choice)
if not selected_command:
logger.info(f"[{OneColors.LIGHT_YELLOW}] Invalid command '{choice}'.")
logger.info(f"Invalid command '{choice}'.")
return True
if selected_command.requires_input:
program = get_program_invocation()
self.console.print(
f"[{OneColors.LIGHT_YELLOW}]⚠️ Command '{selected_command.key}' requires input "
f"and must be run via [{OneColors.MAGENTA}]'{program} run'[{OneColors.LIGHT_YELLOW}] "
"with proper piping or arguments.[/]"
)
return True
self.last_run_command = selected_command
if selected_command == self.exit_command:
@ -667,7 +712,7 @@ class Falyx:
return False
if not await self._should_run_action(selected_command):
logger.info(f"[{OneColors.DARK_RED}] {selected_command.description} cancelled.")
logger.info(f"{selected_command.description} cancelled.")
return True
context = self._create_context(selected_command)
@ -750,7 +795,10 @@ class Falyx:
selected_command.retry_policy.delay = self.cli_args.retry_delay
if self.cli_args.retry_backoff:
selected_command.retry_policy.backoff = self.cli_args.retry_backoff
#selected_command.update_retry_policy(selected_command.retry_policy)
if isinstance(selected_command.action, Action):
selected_command.action.set_retry_policy(selected_command.retry_policy)
else:
logger.warning(f"[Command:{selected_command.key}] Retry requested, but action is not an Action instance.")
def print_message(self, message: str | Markdown | dict[str, Any]) -> None:
"""Prints a message to the console."""
@ -773,14 +821,14 @@ class Falyx:
if self.welcome_message:
self.print_message(self.welcome_message)
while True:
self.console.print(self.table)
self.console.print(self.table, justify="center")
try:
task = asyncio.create_task(self.process_command())
should_continue = await task
if not should_continue:
break
except (EOFError, KeyboardInterrupt):
logger.info(f"[{OneColors.DARK_RED}]EOF or KeyboardInterrupt. Exiting menu.")
logger.info("EOF or KeyboardInterrupt. Exiting menu.")
break
logger.info(f"Exiting menu: {self.get_title()}")
if self.exit_message:

View File

@ -1,3 +1,4 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""hook_manager.py"""
from __future__ import annotations
@ -64,5 +65,6 @@ class HookManager:
f" for '{context.name}': {hook_error}")
if hook_type == HookType.ON_ERROR:
assert isinstance(context.exception, BaseException)
assert isinstance(context.exception, Exception), "Context exception should be set for ON_ERROR hook"
raise context.exception from hook_error

View File

@ -1,3 +1,4 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""hooks.py"""
import time

View File

@ -1,3 +1,4 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""importer.py"""
import importlib

View File

@ -1,3 +1,4 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""io_action.py"""
import asyncio
import subprocess
@ -12,8 +13,8 @@ from falyx.context import ExecutionContext
from falyx.exceptions import FalyxError
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.utils import logger
from falyx.themes.colors import OneColors
from falyx.utils import logger
console = Console()
@ -34,7 +35,7 @@ class BaseIOAction(BaseAction):
inject_last_result=inject_last_result,
)
self.mode = mode
self.requires_injection = True
self._requires_injection = True
def from_input(self, raw: str | bytes) -> Any:
raise NotImplementedError
@ -178,3 +179,29 @@ class ShellAction(BaseIOAction):
parent.add("".join(label))
else:
console.print(Tree("".join(label)))
class GrepAction(BaseIOAction):
def __init__(self, name: str, pattern: str, **kwargs):
super().__init__(name=name, **kwargs)
self.pattern = pattern
def from_input(self, raw: str | bytes) -> str:
if not isinstance(raw, (str, bytes)):
raise TypeError(f"{self.name} expected str or bytes input, got {type(raw).__name__}")
return raw.strip() if isinstance(raw, str) else raw.decode("utf-8").strip()
async def _run(self, parsed_input: str) -> str:
command = ["grep", "-n", self.pattern]
process = subprocess.Popen(
command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout, stderr = process.communicate(input=parsed_input)
if process.returncode == 1:
return ""
if process.returncode != 0:
raise RuntimeError(stderr.strip())
return stdout.strip()
def to_output(self, result: str) -> str:
return result

View File

@ -1,88 +0,0 @@
import asyncio
import logging
from rich.markdown import Markdown
from falyx import Action, Falyx
from falyx.hook_manager import HookType
from falyx.debug import log_before, log_success, log_error, log_after
from falyx.themes.colors import OneColors
from falyx.utils import setup_logging
# Setup logging
setup_logging(console_log_level=logging.WARNING, json_log_to_file=True)
def main():
# Create the menu
menu = Falyx(
title=Markdown("# 🚀 Falyx CLI Demo"),
welcome_message="Welcome to Falyx!",
exit_message="Thanks for using Falyx!",
include_history_command=True,
include_help_command=True,
)
# Define async actions
async def hello():
print("👋 Hello from Falyx CLI!")
def goodbye():
print("👋 Goodbye from Falyx CLI!")
async def do_task_and_increment(counter_name: str = "tasks"):
await asyncio.sleep(3)
print("✅ Task completed.")
menu.bottom_bar.increment_total_counter(counter_name)
# Register global logging hooks
menu.hooks.register(HookType.BEFORE, log_before)
menu.hooks.register(HookType.ON_SUCCESS, log_success)
menu.hooks.register(HookType.ON_ERROR, log_error)
menu.hooks.register(HookType.AFTER, log_after)
# Add a toggle to the bottom bar
menu.add_toggle("D", "Debug Mode", state=False)
# Add a counter to the bottom bar
menu.add_total_counter("tasks", "Tasks", current=0, total=5)
# Add static text to the bottom bar
menu.add_static("env", "🌐 Local Env")
# Add commands with help_text
menu.add_command(
key="S",
description="Say Hello",
help_text="Greets the user with a friendly hello message.",
action=Action("Hello", hello),
color=OneColors.CYAN,
)
menu.add_command(
key="G",
description="Say Goodbye",
help_text="Bids farewell and thanks the user for using the app.",
action=Action("Goodbye", goodbye),
color=OneColors.MAGENTA,
)
menu.add_command(
key="T",
description="Run a Task",
aliases=["task", "run"],
help_text="Performs a task and increments the counter by 1.",
action=do_task_and_increment,
args=("tasks",),
color=OneColors.GREEN,
spinner=True,
)
asyncio.run(menu.run())
if __name__ == "__main__":
"""
Entry point for the Falyx CLI demo application.
This function initializes the menu and runs it.
"""
main()

View File

@ -1,3 +1,4 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""options_manager.py"""
from argparse import Namespace

View File

@ -1,3 +1,4 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""parsers.py
This module contains the argument parsers used for the Falyx CLI.
"""

View File

@ -1,5 +1,7 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""retry.py"""
import asyncio
import random
from pydantic import BaseModel, Field
@ -11,8 +13,16 @@ class RetryPolicy(BaseModel):
max_retries: int = Field(default=3, ge=0)
delay: float = Field(default=1.0, ge=0.0)
backoff: float = Field(default=2.0, ge=1.0)
jitter: float = Field(default=0.0, ge=0.0)
enabled: bool = False
def enable_policy(self) -> None:
"""
Enable the retry policy.
:return: None
"""
self.enabled = True
def is_active(self) -> bool:
"""
Check if the retry policy is active.
@ -25,11 +35,18 @@ class RetryHandler:
def __init__(self, policy: RetryPolicy=RetryPolicy()):
self.policy = policy
def enable_policy(self, backoff=2, max_retries=3, delay=1):
def enable_policy(
self,
max_retries: int=3,
delay: float=1.0,
backoff: float=2.0,
jitter: float=0.0,
):
self.policy.enabled = True
self.policy.max_retries = max_retries
self.policy.delay = delay
self.policy.backoff = backoff
self.policy.jitter = jitter
logger.info(f"🔄 Retry policy enabled: {self.policy}")
async def retry_on_error(self, context: ExecutionContext):
@ -60,7 +77,15 @@ class RetryHandler:
while retries_done < self.policy.max_retries:
retries_done += 1
logger.info(f"[{name}] 🔄 Retrying ({retries_done}/{self.policy.max_retries}) in {current_delay}s due to '{last_error}'...")
sleep_delay = current_delay
if self.policy.jitter > 0:
sleep_delay += random.uniform(-self.policy.jitter, self.policy.jitter)
logger.info(
f"[{name}] 🔄 Retrying ({retries_done}/{self.policy.max_retries}) "
f"in {current_delay}s due to '{last_error}'..."
)
await asyncio.sleep(current_delay)
try:
result = await target.action(*context.args, **context.kwargs)
@ -71,7 +96,10 @@ class RetryHandler:
except Exception as retry_error:
last_error = retry_error
current_delay *= self.policy.backoff
logger.warning(f"[{name}] ⚠️ Retry attempt {retries_done}/{self.policy.max_retries} failed due to '{retry_error}'.")
logger.warning(
f"[{name}] ⚠️ Retry attempt {retries_done}/{self.policy.max_retries} "
f"failed due to '{retry_error}'."
)
context.exception = last_error
logger.error(f"[{name}] ❌ All {self.policy.max_retries} retries failed.")

31
falyx/tagged_table.py Normal file
View File

@ -0,0 +1,31 @@
from collections import defaultdict
from rich import box
from rich.table import Table
from falyx.command import Command
from falyx.falyx import Falyx
def build_tagged_table(flx: Falyx) -> Table:
"""Custom table builder that groups commands by tags."""
table = Table(title=flx.title, show_header=False, box=box.SIMPLE)
# Group commands by first tag
grouped: dict[str, list[Command]] = defaultdict(list)
for cmd in flx.commands.values():
first_tag = cmd.tags[0] if cmd.tags else "Other"
grouped[first_tag.capitalize()].append(cmd)
# Add grouped commands to table
for group_name, commands in grouped.items():
table.add_row(f"[bold underline]{group_name} Commands[/]")
for cmd in commands:
table.add_row(f"[{cmd.key}] [{cmd.color}]{cmd.description}")
table.add_row("")
# Add bottom row
for row in flx.get_bottom_row():
table.add_row(row)
return table

View File

@ -1,8 +1,11 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""utils.py"""
import functools
import inspect
import logging
import os
import shutil
import sys
from itertools import islice
from typing import Any, Awaitable, Callable, TypeVar
@ -21,6 +24,20 @@ T = TypeVar("T")
async def _noop(*args, **kwargs):
pass
def get_program_invocation() -> str:
"""Returns the recommended program invocation prefix."""
script = sys.argv[0]
program = shutil.which(script)
if program:
return os.path.basename(program)
executable = sys.executable
if "python" in executable:
return f"python {script}"
return script
def is_coroutine(function: Callable[..., Any]) -> bool:
return inspect.iscoroutinefunction(function)
@ -32,6 +49,9 @@ def ensure_async(function: Callable[..., T]) -> Callable[..., Awaitable[T]]:
@functools.wraps(function)
async def async_wrapper(*args, **kwargs) -> T:
return function(*args, **kwargs)
if not callable(function):
raise TypeError(f"{function} is not callable")
return async_wrapper

View File

@ -1 +1 @@
__version__ = "0.1.5"
__version__ = "0.1.6"

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "falyx"
version = "0.1.5"
version = "0.1.6"
description = "Reliable and introspectable async CLI action framework."
authors = ["Roland Thomas Jr <roland@rtj.dev>"]
license = "MIT"

View File

@ -0,0 +1,78 @@
import pytest
from falyx.action import Action, ChainedAction, ActionGroup, FallbackAction
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.context import ExecutionContext
asyncio_default_fixture_loop_scope = "function"
# --- Helpers ---
async def capturing_hook(context: ExecutionContext):
context.extra["hook_triggered"] = True
# --- Fixtures ---
@pytest.fixture(autouse=True)
def clean_registry():
er.clear()
yield
er.clear()
@pytest.mark.asyncio
async def test_action_callable():
"""Test if Action can be created with a callable."""
action = Action("test_action", lambda: "Hello, World!")
result = await action()
assert result == "Hello, World!"
@pytest.mark.asyncio
async def test_action_async_callable():
"""Test if Action can be created with an async callable."""
async def async_callable():
return "Hello, World!"
action = Action("test_action", async_callable)
result = await action()
assert result == "Hello, World!"
@pytest.mark.asyncio
async def test_action_non_callable():
"""Test if Action raises an error when created with a non-callable."""
with pytest.raises(TypeError):
Action("test_action", 42)
@pytest.mark.asyncio
@pytest.mark.parametrize("return_list, expected", [
(True, [1, 2, 3]),
(False, 3),
])
async def test_chained_action_return_modes(return_list, expected):
chain = ChainedAction(
name="Simple Chain",
actions=[
Action(name="one", action=lambda: 1),
Action(name="two", action=lambda: 2),
Action(name="three", action=lambda: 3),
],
return_list=return_list
)
result = await chain()
assert result == expected
@pytest.mark.asyncio
@pytest.mark.parametrize("return_list, auto_inject, expected", [
(True, True, [1, 2, 3]),
(True, False, [1, 2, 3]),
(False, True, 3),
(False, False, 3),
])
async def test_chained_action_literals(return_list, auto_inject, expected):
chain = ChainedAction(
name="Literal Chain",
actions=[1, 2, 3],
return_list=return_list,
auto_inject=auto_inject,
)
result = await chain()
assert result == expected

View File

View File

View File

@ -0,0 +1,40 @@
import pickle
import warnings
import pytest
from falyx.action import ProcessAction
from falyx.execution_registry import ExecutionRegistry as er
# --- Fixtures ---
@pytest.fixture(autouse=True)
def clean_registry():
er.clear()
yield
er.clear()
def slow_add(x, y):
return x + y
# --- Tests ---
@pytest.mark.asyncio
async def test_process_action_executes_correctly():
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
action = ProcessAction(name="proc", func=slow_add, args=(2, 3))
result = await action()
assert result == 5
unpickleable = lambda x: x + 1
@pytest.mark.asyncio
async def test_process_action_rejects_unpickleable():
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
action = ProcessAction(name="proc_fail", func=unpickleable, args=(2,))
with pytest.raises(pickle.PicklingError, match="Can't pickle"):
await action()

View File

@ -0,0 +1,30 @@
import pytest
from falyx.action import Action, ChainedAction, ActionGroup, FallbackAction
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.context import ExecutionContext
asyncio_default_fixture_loop_scope = "function"
# --- Helpers ---
async def capturing_hook(context: ExecutionContext):
context.extra["hook_triggered"] = True
# --- Fixtures ---
@pytest.fixture
def hook_manager():
hm = HookManager()
hm.register(HookType.BEFORE, capturing_hook)
return hm
@pytest.fixture(autouse=True)
def clean_registry():
er.clear()
yield
er.clear()
def test_action_enable_retry():
"""Test if Action can be created with retry=True."""
action = Action("test_action", lambda: "Hello, World!", retry=True)
assert action.retry_policy.enabled is True

View File

@ -1,28 +1,17 @@
import pytest
import asyncio
import pickle
import warnings
from falyx.action import Action, ChainedAction, ActionGroup, ProcessAction
from falyx.action import Action, ChainedAction, ActionGroup, FallbackAction
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.context import ExecutionContext, ResultsContext
from falyx.context import ExecutionContext
asyncio_default_fixture_loop_scope = "function"
# --- Helpers ---
async def dummy_action(x: int = 0) -> int:
return x + 1
async def capturing_hook(context: ExecutionContext):
context.extra["hook_triggered"] = True
# --- Fixtures ---
@pytest.fixture
def sample_action():
return Action(name="increment", action=dummy_action, kwargs={"x": 5})
@pytest.fixture
def hook_manager():
hm = HookManager()
@ -38,15 +27,18 @@ def clean_registry():
# --- Tests ---
@pytest.mark.asyncio
async def test_action_runs_correctly(sample_action):
async def test_action_runs_correctly():
async def dummy_action(x: int = 0) -> int: return x + 1
sample_action = Action(name="increment", action=dummy_action, kwargs={"x": 5})
result = await sample_action()
assert result == 6
@pytest.mark.asyncio
async def test_action_hook_lifecycle(hook_manager):
async def a1(): return 42
action = Action(
name="hooked",
action=lambda: 42,
action=a1,
hooks=hook_manager
)
@ -58,21 +50,30 @@ async def test_action_hook_lifecycle(hook_manager):
@pytest.mark.asyncio
async def test_chained_action_with_result_injection():
async def a1(): return 1
async def a2(last_result): return last_result + 5
async def a3(last_result): return last_result * 2
actions = [
Action(name="start", action=lambda: 1),
Action(name="add_last", action=lambda last_result: last_result + 5, inject_last_result=True),
Action(name="multiply", action=lambda last_result: last_result * 2, inject_last_result=True)
Action(name="start", action=a1),
Action(name="add_last", action=a2, inject_last_result=True),
Action(name="multiply", action=a3, inject_last_result=True)
]
chain = ChainedAction(name="test_chain", actions=actions, inject_last_result=True)
chain = ChainedAction(name="test_chain", actions=actions, inject_last_result=True, return_list=True)
result = await chain()
assert result == [1, 6, 12]
chain = ChainedAction(name="test_chain", actions=actions, inject_last_result=True)
result = await chain()
assert result == 12
@pytest.mark.asyncio
async def test_action_group_runs_in_parallel():
async def a1(): return 1
async def a2(): return 2
async def a3(): return 3
actions = [
Action(name="a", action=lambda: 1),
Action(name="b", action=lambda: 2),
Action(name="c", action=lambda: 3),
Action(name="a", action=a1),
Action(name="b", action=a2),
Action(name="c", action=a3),
]
group = ActionGroup(name="parallel", actions=actions)
result = await group()
@ -81,39 +82,48 @@ async def test_action_group_runs_in_parallel():
@pytest.mark.asyncio
async def test_chained_action_inject_from_action():
async def a1(last_result): return last_result + 10
async def a2(last_result): return last_result + 5
inner_chain = ChainedAction(
name="inner_chain",
actions=[
Action(name="inner_first", action=lambda last_result: last_result + 10, inject_last_result=True),
Action(name="inner_second", action=lambda last_result: last_result + 5, inject_last_result=True),
]
Action(name="inner_first", action=a1, inject_last_result=True),
Action(name="inner_second", action=a2, inject_last_result=True),
],
return_list=True,
)
async def a3(): return 1
async def a4(last_result): return last_result + 2
actions = [
Action(name="first", action=lambda: 1),
Action(name="second", action=lambda last_result: last_result + 2, inject_last_result=True),
Action(name="first", action=a3),
Action(name="second", action=a4, inject_last_result=True),
inner_chain,
]
outer_chain = ChainedAction(name="test_chain", actions=actions)
outer_chain = ChainedAction(name="test_chain", actions=actions, return_list=True)
result = await outer_chain()
assert result == [1, 3, [13, 18]]
@pytest.mark.asyncio
async def test_chained_action_with_group():
async def a1(last_result): return last_result + 1
async def a2(last_result): return last_result + 2
async def a3(): return 3
group = ActionGroup(
name="group",
actions=[
Action(name="a", action=lambda last_result: last_result + 1, inject_last_result=True),
Action(name="b", action=lambda last_result: last_result + 2, inject_last_result=True),
Action(name="c", action=lambda: 3),
Action(name="a", action=a1, inject_last_result=True),
Action(name="b", action=a2, inject_last_result=True),
Action(name="c", action=a3),
]
)
async def a4(): return 1
async def a5(last_result): return last_result + 2
actions = [
Action(name="first", action=lambda: 1),
Action(name="second", action=lambda last_result: last_result + 2, inject_last_result=True),
Action(name="first", action=a4),
Action(name="second", action=a5, inject_last_result=True),
group,
]
chain = ChainedAction(name="test_chain", actions=actions)
chain = ChainedAction(name="test_chain", actions=actions, return_list=True)
result = await chain()
assert result == [1, 3, [("a", 4), ("b", 5), ("c", 3)]]
@ -161,37 +171,21 @@ async def test_chained_action_rollback_on_failure():
assert rollback_called == ["rolled back"]
def slow_add(x, y):
return x + y
@pytest.mark.asyncio
async def test_process_action_executes_correctly():
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
action = ProcessAction(name="proc", func=slow_add, args=(2, 3))
result = await action()
assert result == 5
unpickleable = lambda x: x + 1
@pytest.mark.asyncio
async def test_process_action_rejects_unpickleable():
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
action = ProcessAction(name="proc_fail", func=unpickleable, args=(2,))
with pytest.raises(pickle.PicklingError, match="Can't pickle"):
await action()
@pytest.mark.asyncio
async def test_register_hooks_recursively_propagates():
hook = lambda ctx: ctx.extra.update({"test_marker": True})
def hook(context):
context.extra.update({"test_marker": True})
chain = ChainedAction(name="chain", actions=[
Action(name="a", action=lambda: 1),
Action(name="b", action=lambda: 2),
])
async def a1(): return 1
async def a2(): return 2
chain = ChainedAction(
name="chain",
actions=[
Action(name="a", action=a1),
Action(name="b", action=a2),
],
)
chain.register_hooks_recursively(HookType.BEFORE, hook)
await chain()
@ -217,14 +211,255 @@ async def test_action_hook_recovers_error():
@pytest.mark.asyncio
async def test_action_group_injects_last_result():
async def a1(last_result): return last_result + 10
async def a2(last_result): return last_result + 20
group = ActionGroup(name="group", actions=[
Action(name="g1", action=lambda last_result: last_result + 10, inject_last_result=True),
Action(name="g2", action=lambda last_result: last_result + 20, inject_last_result=True),
Action(name="g1", action=a1, inject_last_result=True),
Action(name="g2", action=a2, inject_last_result=True),
])
chain = ChainedAction(name="with_group", actions=[
Action(name="first", action=lambda: 5),
async def a3(): return 5
chain = ChainedAction(
name="with_group",
actions=[
Action(name="first", action=a3),
group,
])
],
return_list=True,
)
result = await chain()
result_dict = dict(result[1])
assert result_dict == {"g1": 15, "g2": 25}
@pytest.mark.asyncio
async def test_action_inject_last_result():
async def a1(): return 1
async def a2(last_result): return last_result + 1
a1 = Action(name="a1", action=a1)
a2 = Action(name="a2", action=a2, inject_last_result=True)
chain = ChainedAction(name="chain", actions=[a1, a2])
result = await chain()
assert result == 2
@pytest.mark.asyncio
async def test_action_inject_last_result_fail():
async def a1(): return 1
async def a2(last_result): return last_result + 1
a1 = Action(name="a1", action=a1)
a2 = Action(name="a2", action=a2)
chain = ChainedAction(name="chain", actions=[a1, a2])
with pytest.raises(TypeError) as exc_info:
await chain()
assert "last_result" in str(exc_info.value)
@pytest.mark.asyncio
async def test_chained_action_auto_inject():
async def a1(): return 1
async def a2(last_result): return last_result + 2
a1 = Action(name="a1", action=a1)
a2 = Action(name="a2", action=a2)
chain = ChainedAction(name="chain", actions=[a1, a2], auto_inject=True, return_list=True)
result = await chain()
assert result == [1, 3] # a2 receives last_result=1
@pytest.mark.asyncio
async def test_chained_action_no_auto_inject():
async def a1(): return 1
async def a2(): return 2
a1 = Action(name="a1", action=a1)
a2 = Action(name="a2", action=a2)
chain = ChainedAction(name="no_inject", actions=[a1, a2], auto_inject=False, return_list=True)
result = await chain()
assert result == [1, 2] # a2 does not receive 1
@pytest.mark.asyncio
async def test_chained_action_auto_inject_after_first():
async def a1(): return 1
async def a2(last_result): return last_result + 1
a1 = Action(name="a1", action=a1)
a2 = Action(name="a2", action=a2)
chain = ChainedAction(name="auto_inject", actions=[a1, a2], auto_inject=True)
result = await chain()
assert result == 2 # a2 receives last_result=1
@pytest.mark.asyncio
async def test_chained_action_with_literal_input():
async def a1(last_result): return last_result + " world"
a1 = Action(name="a1", action=a1)
chain = ChainedAction(name="literal_inject", actions=["hello", a1], auto_inject=True)
result = await chain()
assert result == "hello world" # "hello" is injected as last_result
@pytest.mark.asyncio
async def test_chained_action_manual_inject_override():
async def a1(): return 10
async def a2(last_result): return last_result * 2
a1 = Action(name="a1", action=a1)
a2 = Action(name="a2", action=a2, inject_last_result=True)
chain = ChainedAction(name="manual_override", actions=[a1, a2], auto_inject=False)
result = await chain()
assert result == 20 # Even without auto_inject, a2 still gets last_result
@pytest.mark.asyncio
async def test_chained_action_with_mid_literal():
async def fetch_data():
# Imagine this is some dynamic API call
return None # Simulate failure or missing data
async def validate_data(last_result):
if last_result is None:
raise ValueError("Missing data!")
return last_result
async def enrich_data(last_result):
return f"Enriched: {last_result}"
chain = ChainedAction(
name="fallback_pipeline",
actions=[
Action(name="FetchData", action=fetch_data),
"default_value", # <-- literal fallback injected mid-chain
Action(name="ValidateData", action=validate_data),
Action(name="EnrichData", action=enrich_data),
],
auto_inject=True,
return_list=True,
)
result = await chain()
assert result == [None, "default_value", "default_value", "Enriched: default_value"]
@pytest.mark.asyncio
async def test_chained_action_with_mid_fallback():
async def fetch_data():
# Imagine this is some dynamic API call
return None # Simulate failure or missing data
async def validate_data(last_result):
if last_result is None:
raise ValueError("Missing data!")
return last_result
async def enrich_data(last_result):
return f"Enriched: {last_result}"
chain = ChainedAction(
name="fallback_pipeline",
actions=[
Action(name="FetchData", action=fetch_data),
FallbackAction(fallback="default_value"),
Action(name="ValidateData", action=validate_data),
Action(name="EnrichData", action=enrich_data),
],
auto_inject=True,
return_list=True,
)
result = await chain()
assert result == [None, "default_value", "default_value", "Enriched: default_value"]
@pytest.mark.asyncio
async def test_chained_action_with_success_mid_fallback():
async def fetch_data():
# Imagine this is some dynamic API call
return "Result" # Simulate success
async def validate_data(last_result):
if last_result is None:
raise ValueError("Missing data!")
return last_result
async def enrich_data(last_result):
return f"Enriched: {last_result}"
chain = ChainedAction(
name="fallback_pipeline",
actions=[
Action(name="FetchData", action=fetch_data),
FallbackAction(fallback="default_value"),
Action(name="ValidateData", action=validate_data),
Action(name="EnrichData", action=enrich_data),
],
auto_inject=True,
return_list=True,
)
result = await chain()
assert result == ["Result", "Result", "Result", "Enriched: Result"]
@pytest.mark.asyncio
async def test_action_group_partial_failure():
async def succeed(): return "ok"
async def fail(): raise ValueError("oops")
group = ActionGroup(name="partial_group", actions=[
Action(name="succeed_action", action=succeed),
Action(name="fail_action", action=fail),
])
with pytest.raises(Exception) as exc_info:
await group()
assert er.get_by_name("succeed_action")[0].result == "ok"
assert er.get_by_name("fail_action")[0].exception is not None
assert "fail_action" in str(exc_info.value)
@pytest.mark.asyncio
async def test_chained_action_with_nested_group():
async def g1(last_result): return last_result + "10"
async def g2(last_result): return last_result + "20"
group = ActionGroup(
name="nested_group",
actions=[
Action(name="g1", action=g1, inject_last_result=True),
Action(name="g2", action=g2, inject_last_result=True),
],
)
chain = ChainedAction(
name="chain_with_group",
actions=[
"start",
group,
],
auto_inject=True,
return_list=True,
)
result = await chain()
# "start" -> group both receive "start" as last_result
assert result[0] == "start"
assert dict(result[1]) == {"g1": "start10", "g2": "start20"} # Assuming string concatenation for example
@pytest.mark.asyncio
async def test_chained_action_double_fallback():
async def fetch_data(last_result=None):
raise ValueError("No data!") # Simulate failure
async def validate_data(last_result):
if last_result is None:
raise ValueError("No data!")
return last_result
async def enrich(last_result):
return f"Enriched: {last_result}"
chain = ChainedAction(
name="fallback_chain",
actions=[
Action(name="Fetch", action=fetch_data),
FallbackAction(fallback="default1"),
Action(name="Validate", action=validate_data),
Action(name="Fetch", action=fetch_data),
FallbackAction(fallback="default2"),
Action(name="Enrich", action=enrich),
],
auto_inject=True,
return_list=True,
)
result = await chain()
assert result == [None, "default1", "default1", None, "default2", "Enriched: default2"]

View File

@ -0,0 +1,27 @@
import pytest
from falyx.action import ChainedAction
from falyx.exceptions import EmptyChainError
@pytest.mark.asyncio
async def test_chained_action_raises_empty_chain_error_when_no_actions():
"""A ChainedAction with no actions should raise an EmptyChainError immediately."""
chain = ChainedAction(name="empty_chain", actions=[])
with pytest.raises(EmptyChainError) as exc_info:
await chain()
assert "No actions to execute." in str(exc_info.value)
assert "empty_chain" in str(exc_info.value)
@pytest.mark.asyncio
async def test_chained_action_raises_empty_chain_error_when_actions_are_none():
"""A ChainedAction with None as actions should raise an EmptyChainError immediately."""
chain = ChainedAction(name="none_chain", actions=None)
with pytest.raises(EmptyChainError) as exc_info:
await chain()
assert "No actions to execute." in str(exc_info.value)
assert "none_chain" in str(exc_info.value)

223
tests/test_command.py Normal file
View File

@ -0,0 +1,223 @@
# test_command.py
import pytest
from falyx.action import Action, ActionGroup, ChainedAction
from falyx.command import Command
from falyx.io_action import BaseIOAction
from falyx.execution_registry import ExecutionRegistry as er
from falyx.retry import RetryPolicy
asyncio_default_fixture_loop_scope = "function"
# --- Fixtures ---
@pytest.fixture(autouse=True)
def clean_registry():
er.clear()
yield
er.clear()
# --- Dummy Action ---
async def dummy_action():
return "ok"
# --- Dummy IO Action ---
class DummyInputAction(BaseIOAction):
async def _run(self, *args, **kwargs):
return "needs input"
async def preview(self, parent=None):
pass
# --- Tests ---
def test_command_creation():
"""Test if Command can be created with a callable."""
action = Action("test_action", dummy_action)
cmd = Command(
key="TEST",
description="Test Command",
action=action
)
assert cmd.key == "TEST"
assert cmd.description == "Test Command"
assert cmd.action == action
def test_command_str():
"""Test if Command string representation is correct."""
action = Action("test_action", dummy_action)
cmd = Command(
key="TEST",
description="Test Command",
action=action
)
assert str(cmd) == "Command(key='TEST', description='Test Command' action='Action(name=test_action, action=dummy_action)')"
@pytest.mark.parametrize(
"action_factory, expected_requires_input",
[
(lambda: Action(name="normal", action=dummy_action), False),
(lambda: DummyInputAction(name="io"), True),
(lambda: ChainedAction(name="chain", actions=[DummyInputAction(name="io")]), True),
(lambda: ActionGroup(name="group", actions=[DummyInputAction(name="io")]), True),
]
)
def test_command_requires_input_detection(action_factory, expected_requires_input):
action = action_factory()
cmd = Command(
key="TEST",
description="Test Command",
action=action
)
assert cmd.requires_input == expected_requires_input
if expected_requires_input:
assert cmd.hidden is True
else:
assert cmd.hidden is False
def test_requires_input_flag_detected_for_baseioaction():
"""Command should automatically detect requires_input=True for BaseIOAction."""
cmd = Command(
key="X",
description="Echo input",
action=DummyInputAction(name="dummy"),
)
assert cmd.requires_input is True
assert cmd.hidden is True
def test_requires_input_manual_override():
"""Command manually set requires_input=False should not auto-hide."""
cmd = Command(
key="Y",
description="Custom input command",
action=DummyInputAction(name="dummy"),
requires_input=False,
)
assert cmd.requires_input is False
assert cmd.hidden is False
def test_default_command_does_not_require_input():
"""Normal Command without IO Action should not require input."""
cmd = Command(
key="Z",
description="Simple action",
action=lambda: 42,
)
assert cmd.requires_input is False
assert cmd.hidden is False
def test_chain_requires_input():
"""If first action in a chain requires input, the command should require input."""
chain = ChainedAction(
name="ChainWithInput",
actions=[
DummyInputAction(name="dummy"),
Action(name="action1", action=lambda: 1),
],
)
cmd = Command(
key="A",
description="Chain with input",
action=chain,
)
assert cmd.requires_input is True
assert cmd.hidden is True
def test_group_requires_input():
"""If any action in a group requires input, the command should require input."""
group = ActionGroup(
name="GroupWithInput",
actions=[
Action(name="action1", action=lambda: 1),
DummyInputAction(name="dummy"),
],
)
cmd = Command(
key="B",
description="Group with input",
action=group,
)
assert cmd.requires_input is True
assert cmd.hidden is True
def test_enable_retry():
"""Command should enable retry if action is an Action and retry is set to True."""
cmd = Command(
key="A",
description="Retry action",
action=Action(
name="retry_action",
action=lambda: 42,
),
retry=True,
)
assert cmd.retry is True
assert cmd.action.retry_policy.enabled is True
def test_enable_retry_with_retry_policy():
"""Command should enable retry if action is an Action and retry_policy is set."""
retry_policy = RetryPolicy(
max_retries=3,
delay=1,
backoff=2,
enabled=True,
)
cmd = Command(
key="B",
description="Retry action with policy",
action=Action(
name="retry_action_with_policy",
action=lambda: 42,
),
retry_policy=retry_policy,
)
assert cmd.action.retry_policy.enabled is True
assert cmd.action.retry_policy == retry_policy
def test_enable_retry_not_action():
"""Command should not enable retry if action is not an Action."""
cmd = Command(
key="C",
description="Retry action",
action=DummyInputAction,
retry=True,
)
assert cmd.retry is True
with pytest.raises(Exception) as exc_info:
assert cmd.action.retry_policy.enabled is False
assert "'function' object has no attribute 'retry_policy'" in str(exc_info.value)
def test_chain_retry_all():
"""retry_all should retry all Actions inside a ChainedAction recursively."""
chain = ChainedAction(
name="ChainWithRetry",
actions=[
Action(name="action1", action=lambda: 1),
Action(name="action2", action=lambda: 2),
],
)
cmd = Command(
key="D",
description="Chain with retry",
action=chain,
retry_all=True,
)
assert cmd.retry_all is True
assert cmd.retry_policy.enabled is True
assert chain.actions[0].retry_policy.enabled is True
assert chain.actions[1].retry_policy.enabled is True
def test_chain_retry_all_not_base_action():
"""retry_all should not be set if action is not a ChainedAction."""
cmd = Command(
key="E",
description="Chain with retry",
action=DummyInputAction,
retry_all=True,
)
assert cmd.retry_all is True
with pytest.raises(Exception) as exc_info:
assert cmd.action.retry_policy.enabled is False
assert "'function' object has no attribute 'retry_policy'" in str(exc_info.value)

View File

@ -0,0 +1,200 @@
import pytest
import asyncio
from falyx.action import Action, ChainedAction, ActionGroup, FallbackAction
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookType
from falyx.context import ExecutionContext
# --- Fixtures ---
@pytest.fixture(autouse=True)
def clean_registry():
er.clear()
yield
er.clear()
# --- Stress Tests ---
@pytest.mark.asyncio
async def test_action_group_partial_failure():
async def succeed():
return "ok"
async def fail():
raise ValueError("oops")
group = ActionGroup(
name="partial_group",
actions=[
Action(name="succeed_action", action=succeed),
Action(name="fail_action", action=fail),
],
)
with pytest.raises(Exception) as exc_info:
await group()
assert "fail_action" in str(exc_info.value)
@pytest.mark.asyncio
async def test_chained_action_with_nested_group():
group = ActionGroup(
name="nested_group",
actions=[
Action(
name="g1",
action=lambda last_result: f"{last_result} + 10",
inject_last_result=True,
),
Action(
name="g2",
action=lambda last_result: f"{last_result} + 20",
inject_last_result=True,
),
],
)
chain = ChainedAction(
name="chain_with_group",
actions=[
"start",
group,
],
auto_inject=True,
return_list=True,
)
result = await chain()
assert result[0] == "start"
result_dict = dict(result[1])
assert result_dict == {"g1": "start + 10", "g2": "start + 20"}
@pytest.mark.asyncio
async def test_chained_action_with_error_mid_fallback():
async def ok():
return 1
async def fail():
raise RuntimeError("bad")
chain = ChainedAction(
name="group_with_fallback",
actions=[
Action(name="ok", action=ok),
Action(name="fail", action=fail),
FallbackAction(fallback="recovered"),
],
return_list=True,
)
result = await chain()
assert result == [1, None, "recovered"]
@pytest.mark.asyncio
async def test_chained_action_double_fallback():
async def fetch_data():
return None
async def validate_data(last_result):
if last_result is None:
raise ValueError("No data!")
return last_result
async def enrich(last_result):
return f"Enriched: {last_result}"
chain = ChainedAction(
name="fallback_chain",
actions=[
Action(name="Fetch", action=fetch_data),
FallbackAction(fallback="default1"),
Action(name="Validate", action=validate_data),
FallbackAction(fallback="default2"),
Action(name="Enrich", action=enrich),
],
auto_inject=True,
return_list=True,
)
result = await chain()
assert result == [None, "default1", "default1", "default1", "Enriched: default1"]
@pytest.mark.asyncio
async def test_large_chain_stress():
chain = ChainedAction(
name="large_chain",
actions=[
Action(
name=f"a{i}",
action=lambda last_result: (
last_result + 1 if last_result is not None else 0
),
inject_last_result=True,
)
for i in range(50)
],
auto_inject=True,
)
result = await chain()
assert result == 49 # Start from 0 and add 1 fifty times
@pytest.mark.asyncio
async def test_nested_chain_inside_group():
inner_chain = ChainedAction(
name="inner",
actions=[
1,
Action(
name="a",
action=lambda last_result: last_result + 1,
inject_last_result=True,
),
Action(
name="b",
action=lambda last_result: last_result + 2,
inject_last_result=True,
),
],
)
group = ActionGroup(
name="outer_group",
actions=[
Action(name="starter", action=lambda: 10),
inner_chain,
],
)
result = await group()
result_dict = dict(result)
assert result_dict["starter"] == 10
assert result_dict["inner"] == 4
@pytest.mark.asyncio
async def test_mixed_sync_async_actions():
async def async_action(last_result):
return last_result + 5
def sync_action(last_result):
return last_result * 2
chain = ChainedAction(
name="mixed_chain",
actions=[
Action(name="start", action=lambda: 1),
Action(name="double", action=sync_action, inject_last_result=True),
Action(name="plus_five", action=async_action, inject_last_result=True),
],
)
result = await chain()
assert result == 7