Linting, pre-commit

This commit is contained in:
2025-05-01 20:26:50 -04:00
parent 4b1a9ef718
commit e91654ca27
33 changed files with 795 additions and 368 deletions

1
.gitignore vendored
View File

@ -15,4 +15,3 @@ build/
.vscode/
coverage.xml
.coverage

View File

@ -8,15 +8,18 @@ from falyx import Action, ActionGroup, ChainedAction
def hello() -> None:
print("Hello, world!")
hello = Action(name="hello_action", action=hello)
# Actions can be run by themselves or as part of a command or pipeline
asyncio.run(hello())
# Actions are designed to be asynchronous first
async def goodbye() -> None:
print("Goodbye!")
goodbye = Action(name="goodbye_action", action=goodbye)
asyncio.run(goodbye())

View File

@ -1,10 +1,12 @@
from rich.console import Console
from falyx import Falyx, ProcessAction
from falyx.themes.colors import NordColors as nc
from rich.console import Console
console = Console()
falyx = Falyx(title="🚀 Process Pool Demo")
def generate_primes(n):
primes = []
for num in range(2, n):
@ -13,6 +15,7 @@ def generate_primes(n):
console.print(f"Generated {len(primes)} primes up to {n}.", style=nc.GREEN)
return primes
# Will not block the event loop
heavy_action = ProcessAction("Prime Generator", generate_primes, args=(100_000,))

View File

@ -6,6 +6,7 @@ from falyx.utils import setup_logging
setup_logging()
# A flaky async step that fails randomly
async def flaky_step():
await asyncio.sleep(0.2)
@ -13,6 +14,7 @@ async def flaky_step():
raise RuntimeError("Random failure!")
return "ok"
# Create a retry handler
step1 = Action(name="step_1", action=flaky_step, retry=True)
step2 = Action(name="step_2", action=flaky_step, retry=True)

View File

@ -4,6 +4,7 @@ Falyx CLI Framework
Copyright (c) 2025 rtj.dev LLC.
Licensed under the MIT License. See LICENSE file for details.
"""
import logging
from .action import Action, ActionGroup, ChainedAction, ProcessAction

View File

@ -4,6 +4,7 @@ Falyx CLI Framework
Copyright (c) 2025 rtj.dev LLC.
Licensed under the MIT License. See LICENSE file for details.
"""
import asyncio
import random
from argparse import Namespace
@ -131,7 +132,7 @@ async def main() -> None:
Action("Clean", foo.clean),
Action("Build", foo.build_package),
Action("Package", foo.package),
]
],
)
flx.add_command(
key="P",
@ -150,7 +151,7 @@ async def main() -> None:
Action("Unit Tests", foo.run_tests),
Action("Integration Tests", foo.run_integration_tests),
Action("Lint", foo.run_linter),
]
],
)
flx.add_command(
key="G",

View File

@ -59,6 +59,7 @@ class BaseAction(ABC):
(default: 'last_result').
_requires_injection (bool): Whether the action requires input injection.
"""
def __init__(
self,
name: str,
@ -156,6 +157,7 @@ class Action(BaseAction):
retry (bool, optional): Enable retry logic.
retry_policy (RetryPolicy, optional): Retry settings.
"""
def __init__(
self,
name: str,
@ -264,10 +266,13 @@ class LiteralInputAction(Action):
Args:
value (Any): The static value to inject.
"""
def __init__(self, value: Any):
self._value = value
async def literal(*args, **kwargs):
return value
super().__init__("Input", literal)
@cached_property
@ -293,10 +298,13 @@ class FallbackAction(Action):
Args:
fallback (Any): The fallback value to use if last_result is None.
"""
def __init__(self, fallback: Any):
self._fallback = fallback
async def _fallback_logic(last_result):
return last_result if last_result is not None else fallback
super().__init__(name="Fallback", action=_fallback_logic, inject_last_result=True)
@cached_property
@ -310,6 +318,7 @@ class FallbackAction(Action):
class ActionListMixin:
"""Mixin for managing a list of actions."""
def __init__(self) -> None:
self.actions: list[BaseAction] = []
@ -360,6 +369,7 @@ class ChainedAction(BaseAction, ActionListMixin):
auto_inject (bool, optional): Auto-enable injection for subsequent actions.
return_list (bool, optional): Whether to return a list of all results. False returns the last result.
"""
def __init__(
self,
name: str,
@ -378,7 +388,9 @@ class ChainedAction(BaseAction, ActionListMixin):
self.set_actions(actions)
def _wrap_literal_if_needed(self, action: BaseAction | Any) -> BaseAction:
return LiteralInputAction(action) if not isinstance(action, BaseAction) else action
return (
LiteralInputAction(action) if not isinstance(action, BaseAction) else action
)
def add_action(self, action: BaseAction | Any) -> None:
action = self._wrap_literal_if_needed(action)
@ -408,23 +420,35 @@ class ChainedAction(BaseAction, ActionListMixin):
for index, action in enumerate(self.actions):
if action._skip_in_chain:
logger.debug("[%s] ⚠️ Skipping consumed action '%s'", self.name, action.name)
logger.debug(
"[%s] ⚠️ Skipping consumed action '%s'", self.name, action.name
)
continue
shared_context.current_index = index
prepared = action.prepare_for_chain(shared_context)
last_result = shared_context.last_result()
try:
if self.requires_io_injection() and last_result is not None:
result = await prepared(**{prepared.inject_last_result_as: last_result})
result = await prepared(
**{prepared.inject_last_result_as: last_result}
)
else:
result = await prepared(*args, **updated_kwargs)
except Exception as error:
if index + 1 < len(self.actions) and isinstance(self.actions[index + 1], FallbackAction):
logger.warning("[%s] ⚠️ Fallback triggered: %s, recovering with fallback '%s'.",
self.name, error, self.actions[index + 1].name)
if index + 1 < len(self.actions) and isinstance(
self.actions[index + 1], FallbackAction
):
logger.warning(
"[%s] ⚠️ Fallback triggered: %s, recovering with fallback '%s'.",
self.name,
error,
self.actions[index + 1].name,
)
shared_context.add_result(None)
context.extra["results"].append(None)
fallback = self.actions[index + 1].prepare_for_chain(shared_context)
fallback = self.actions[index + 1].prepare_for_chain(
shared_context
)
result = await fallback()
fallback._skip_in_chain = True
else:
@ -434,7 +458,9 @@ class ChainedAction(BaseAction, ActionListMixin):
context.extra["rollback_stack"].append(prepared)
all_results = context.extra["results"]
assert all_results, f"[{self.name}] No results captured. Something seriously went wrong."
assert (
all_results
), f"[{self.name}] No results captured. Something seriously went wrong."
context.result = all_results if self.return_list else all_results[-1]
await self.hooks.trigger(HookType.ON_SUCCESS, context)
return context.result
@ -528,6 +554,7 @@ class ActionGroup(BaseAction, ActionListMixin):
inject_last_result (bool, optional): Whether to inject last results into kwargs by default.
inject_last_result_as (str, optional): Key name for injection.
"""
def __init__(
self,
name: str,
@ -554,6 +581,7 @@ class ActionGroup(BaseAction, ActionListMixin):
extra={"results": [], "errors": []},
shared_context=shared_context,
)
async def run_one(action: BaseAction):
try:
prepared = action.prepare_for_group(shared_context)
@ -692,7 +720,9 @@ class ProcessAction(BaseAction):
er.record(context)
async def preview(self, parent: Tree | None = None):
label = [f"[{OneColors.DARK_YELLOW_b}]🧠 ProcessAction (new process)[/] '{self.name}'"]
label = [
f"[{OneColors.DARK_YELLOW_b}]🧠 ProcessAction (new process)[/] '{self.name}'"
]
if self.inject_last_result:
label.append(f" [dim](injects '{self.inject_last_result_as}')[/dim]")
if parent:
@ -703,6 +733,7 @@ class ProcessAction(BaseAction):
def _validate_pickleable(self, obj: Any) -> bool:
try:
import pickle
pickle.dumps(obj)
return True
except (pickle.PicklingError, TypeError):

View File

@ -45,11 +45,7 @@ class BottomBar:
def space(self) -> int:
return self.console.width // self.columns
def add_custom(
self,
name: str,
render_fn: Callable[[], HTML]
) -> None:
def add_custom(self, name: str, render_fn: Callable[[], HTML]) -> None:
"""Add a custom render function to the bottom bar."""
if not callable(render_fn):
raise ValueError("`render_fn` must be callable")
@ -63,9 +59,7 @@ class BottomBar:
bg: str = OneColors.WHITE,
) -> None:
def render():
return HTML(
f"<style fg='{fg}' bg='{bg}'>{text:^{self.space}}</style>"
)
return HTML(f"<style fg='{fg}' bg='{bg}'>{text:^{self.space}}</style>")
self._add_named(name, render)
@ -85,9 +79,7 @@ class BottomBar:
get_value_ = self._value_getters[name]
current_ = get_value_()
text = f"{label}: {current_}"
return HTML(
f"<style fg='{fg}' bg='{bg}'>{text:^{self.space}}</style>"
)
return HTML(f"<style fg='{fg}' bg='{bg}'>{text:^{self.space}}</style>")
self._add_named(name, render)
@ -114,9 +106,7 @@ class BottomBar:
f"Current value {current_value} is greater than total value {total}"
)
text = f"{label}: {current_value}/{total}"
return HTML(
f"<style fg='{fg}' bg='{bg}'>{text:^{self.space}}</style>"
)
return HTML(f"<style fg='{fg}' bg='{bg}'>{text:^{self.space}}</style>")
self._add_named(name, render)
@ -138,7 +128,9 @@ class BottomBar:
if key in self.toggle_keys:
raise ValueError(f"Key {key} is already used as a toggle")
if self.key_validator and not self.key_validator(key):
raise ValueError(f"Key '{key}' conflicts with existing command, toggle, or reserved key.")
raise ValueError(
f"Key '{key}' conflicts with existing command, toggle, or reserved key."
)
self._value_getters[key] = get_state
self.toggle_keys.append(key)
@ -147,9 +139,7 @@ class BottomBar:
color = bg_on if get_state_() else bg_off
status = "ON" if get_state_() else "OFF"
text = f"({key.upper()}) {label}: {status}"
return HTML(
f"<style bg='{color}' fg='{fg}'>{text:^{self.space}}</style>"
)
return HTML(f"<style bg='{color}' fg='{fg}'>{text:^{self.space}}</style>")
self._add_named(key, render)

View File

@ -91,6 +91,7 @@ class Command(BaseModel):
result: Property exposing the last result.
log_summary(): Summarizes execution details to the console.
"""
key: str
description: str
action: BaseAction | Callable[[], Any] = _noop
@ -127,12 +128,16 @@ class Command(BaseModel):
elif self.retry_policy and isinstance(self.action, Action):
self.action.set_retry_policy(self.retry_policy)
elif self.retry:
logger.warning(f"[Command:{self.key}] Retry requested, but action is not an Action instance.")
logger.warning(
f"[Command:{self.key}] Retry requested, but action is not an Action instance."
)
if self.retry_all and isinstance(self.action, BaseAction):
self.retry_policy.enabled = True
enable_retries_recursively(self.action, self.retry_policy)
elif self.retry_all:
logger.warning(f"[Command:{self.key}] Retry all requested, but action is not a BaseAction instance.")
logger.warning(
f"[Command:{self.key}] Retry all requested, but action is not a BaseAction instance."
)
if self.logging_hooks and isinstance(self.action, BaseAction):
register_debug_hooks(self.action.hooks)
@ -149,7 +154,11 @@ class Command(BaseModel):
if isinstance(self.action, BaseIOAction):
return True
elif isinstance(self.action, ChainedAction):
return isinstance(self.action.actions[0], BaseIOAction) if self.action.actions else False
return (
isinstance(self.action.actions[0], BaseIOAction)
if self.action.actions
else False
)
elif isinstance(self.action, ActionGroup):
return any(isinstance(action, BaseIOAction) for action in self.action.actions)
return False
@ -164,8 +173,10 @@ class Command(BaseModel):
raise TypeError("Action must be a callable or an instance of BaseAction")
def __str__(self):
return (f"Command(key='{self.key}', description='{self.description}' "
f"action='{self.action}')")
return (
f"Command(key='{self.key}', description='{self.description}' "
f"action='{self.action}')"
)
async def __call__(self, *args, **kwargs):
"""Run the action with full hook lifecycle, timing, and error handling."""
@ -208,9 +219,7 @@ class Command(BaseModel):
def confirmation_prompt(self) -> FormattedText:
"""Generate a styled prompt_toolkit FormattedText confirmation message."""
if self.confirm_message and self.confirm_message != "Are you sure?":
return FormattedText([
("class:confirm", self.confirm_message)
])
return FormattedText([("class:confirm", self.confirm_message)])
action_name = getattr(self.action, "__name__", None)
if isinstance(self.action, BaseAction):
@ -225,7 +234,9 @@ class Command(BaseModel):
prompt.append(("class:confirm", f"(calls `{action_name}`) "))
if self.args or self.kwargs:
prompt.append((OneColors.DARK_YELLOW, f"with args={self.args}, kwargs={self.kwargs} "))
prompt.append(
(OneColors.DARK_YELLOW, f"with args={self.args}, kwargs={self.kwargs} ")
)
return FormattedText(prompt)
@ -248,4 +259,6 @@ class Command(BaseModel):
)
else:
console.print(f"{label}")
console.print(f"[{OneColors.DARK_RED}]⚠️ Action is not callable or lacks a preview method.[/]")
console.print(
f"[{OneColors.DARK_RED}]⚠️ Action is not callable or lacks a preview method.[/]"
)

View File

@ -69,7 +69,6 @@ def loader(file_path: str) -> list[dict[str, Any]]:
if not isinstance(raw_config, list):
raise ValueError("Configuration file must contain a list of command definitions.")
required = ["key", "description", "action"]
commands = []
for entry in raw_config:
@ -80,8 +79,9 @@ def loader(file_path: str) -> list[dict[str, Any]]:
command_dict = {
"key": entry["key"],
"description": entry["description"],
"action": wrap_if_needed(import_action(entry["action"]),
name=entry["description"]),
"action": wrap_if_needed(
import_action(entry["action"]), name=entry["description"]
),
"args": tuple(entry.get("args", ())),
"kwargs": entry.get("kwargs", {}),
"hidden": entry.get("hidden", False),

View File

@ -87,7 +87,11 @@ class ExecutionContext(BaseModel):
def to_log_line(self) -> str:
"""Structured flat-line format for logging and metrics."""
duration_str = f"{self.duration:.3f}s" if self.duration is not None else "n/a"
exception_str = f"{type(self.exception).__name__}: {self.exception}" if self.exception else "None"
exception_str = (
f"{type(self.exception).__name__}: {self.exception}"
if self.exception
else "None"
)
return (
f"[{self.name}] status={self.status} duration={duration_str} "
f"result={repr(self.result)} exception={exception_str}"
@ -95,7 +99,11 @@ class ExecutionContext(BaseModel):
def __str__(self) -> str:
duration_str = f"{self.duration:.3f}s" if self.duration is not None else "n/a"
result_str = f"Result: {repr(self.result)}" if self.success else f"Exception: {self.exception}"
result_str = (
f"Result: {repr(self.result)}"
if self.success
else f"Exception: {self.exception}"
)
return (
f"<ExecutionContext '{self.name}' | {self.status} | "
f"Duration: {duration_str} | {result_str}>"
@ -153,6 +161,7 @@ class SharedContext(BaseModel):
f"Errors: {self.errors}>"
)
if __name__ == "__main__":
import asyncio

View File

@ -22,6 +22,6 @@ class NotAFalyxError(FalyxError):
class CircuitBreakerOpen(FalyxError):
"""Exception raised when the circuit breaker is open."""
class EmptyChainError(FalyxError):
"""Exception raised when the chain is empty."""

View File

@ -53,8 +53,16 @@ class ExecutionRegistry:
table.add_column("Result / Exception", overflow="fold")
for ctx in cls.get_all():
start = datetime.fromtimestamp(ctx.start_time).strftime("%H:%M:%S") if ctx.start_time else "n/a"
end = datetime.fromtimestamp(ctx.end_time).strftime("%H:%M:%S") if ctx.end_time else "n/a"
start = (
datetime.fromtimestamp(ctx.start_time).strftime("%H:%M:%S")
if ctx.start_time
else "n/a"
)
end = (
datetime.fromtimestamp(ctx.end_time).strftime("%H:%M:%S")
if ctx.end_time
else "n/a"
)
duration = f"{ctx.duration:.3f}s" if ctx.duration else "n/a"
if ctx.exception:
@ -74,6 +82,8 @@ class ExecutionRegistry:
def get_history_action(cls) -> "Action":
"""Return an Action that prints the execution summary."""
from falyx.action import Action
async def show_history():
cls.summary()
return Action(name="View Execution History", action=show_history)

View File

@ -42,16 +42,25 @@ from falyx.bottom_bar import BottomBar
from falyx.command import Command
from falyx.context import ExecutionContext
from falyx.debug import log_after, log_before, log_error, log_success
from falyx.exceptions import (CommandAlreadyExistsError, FalyxError,
InvalidActionError, NotAFalyxError)
from falyx.exceptions import (
CommandAlreadyExistsError,
FalyxError,
InvalidActionError,
NotAFalyxError,
)
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import Hook, HookManager, HookType
from falyx.options_manager import OptionsManager
from falyx.parsers import get_arg_parsers
from falyx.retry import RetryPolicy
from falyx.themes.colors import OneColors, get_nord_theme
from falyx.utils import (CaseInsensitiveDict, async_confirm, chunks,
get_program_invocation, logger)
from falyx.utils import (
CaseInsensitiveDict,
async_confirm,
chunks,
get_program_invocation,
logger,
)
from falyx.version import __version__
@ -101,6 +110,7 @@ class Falyx:
build_default_table(): Construct the standard Rich table layout.
"""
def __init__(
self,
title: str | Markdown = "Menu",
@ -117,6 +127,7 @@ class Falyx:
always_confirm: bool = False,
cli_args: Namespace | None = None,
options: OptionsManager | None = None,
render_menu: Callable[["Falyx"], None] | None = None,
custom_table: Callable[["Falyx"], Table] | Table | None = None,
) -> None:
"""Initializes the Falyx object."""
@ -125,8 +136,12 @@ class Falyx:
self.columns: int = columns
self.commands: dict[str, Command] = CaseInsensitiveDict()
self.exit_command: Command = self._get_exit_command()
self.history_command: Command | None = self._get_history_command() if include_history_command else None
self.help_command: Command | None = self._get_help_command() if include_help_command else None
self.history_command: Command | None = (
self._get_history_command() if include_history_command else None
)
self.help_command: Command | None = (
self._get_help_command() if include_help_command else None
)
self.console: Console = Console(color_system="truecolor", theme=get_nord_theme())
self.welcome_message: str | Markdown | dict[str, Any] = welcome_message
self.exit_message: str | Markdown | dict[str, Any] = exit_message
@ -138,6 +153,7 @@ class Falyx:
self._never_confirm: bool = never_confirm
self._always_confirm: bool = always_confirm
self.cli_args: Namespace | None = cli_args
self.render_menu: Callable[["Falyx"], None] | None = render_menu
self.custom_table: Callable[["Falyx"], Table] | Table | None = custom_table
self.set_options(cli_args, options)
self._session: PromptSession | None = None
@ -155,7 +171,9 @@ class Falyx:
if options and not cli_args:
raise FalyxError("Options are set, but CLI arguments are not.")
assert isinstance(cli_args, Namespace), "CLI arguments must be a Namespace object."
assert isinstance(
cli_args, Namespace
), "CLI arguments must be a Namespace object."
if options is None:
self.options.from_namespace(cli_args, "cli_args")
@ -240,27 +258,27 @@ class Falyx:
f"[{command.color}]{command.key}[/]",
", ".join(command.aliases) if command.aliases else "None",
help_text,
", ".join(command.tags) if command.tags else "None"
", ".join(command.tags) if command.tags else "None",
)
table.add_row(
f"[{self.exit_command.color}]{self.exit_command.key}[/]",
", ".join(self.exit_command.aliases),
"Exit this menu or program"
"Exit this menu or program",
)
if self.history_command:
table.add_row(
f"[{self.history_command.color}]{self.history_command.key}[/]",
", ".join(self.history_command.aliases),
"History of executed actions"
"History of executed actions",
)
if self.help_command:
table.add_row(
f"[{self.help_command.color}]{self.help_command.key}[/]",
", ".join(self.help_command.aliases),
"Show this help menu"
"Show this help menu",
)
self.console.print(table, justify="center")
@ -274,6 +292,7 @@ class Falyx:
action=self._show_help,
color=OneColors.LIGHT_YELLOW,
)
def _get_completer(self) -> WordCompleter:
"""Completer to provide auto-completion for the menu commands."""
keys = [self.exit_command.key]
@ -353,15 +372,19 @@ class Falyx:
def bottom_bar(self, bottom_bar: BottomBar | str | Callable[[], Any] | None) -> None:
"""Sets the bottom bar for the menu."""
if bottom_bar is None:
self._bottom_bar: BottomBar | str | Callable[[], Any] = BottomBar(self.columns, self.key_bindings, key_validator=self.is_key_available)
self._bottom_bar: BottomBar | str | Callable[[], Any] = BottomBar(
self.columns, self.key_bindings, key_validator=self.is_key_available
)
elif isinstance(bottom_bar, BottomBar):
bottom_bar.key_validator = self.is_key_available
bottom_bar.key_bindings = self.key_bindings
self._bottom_bar = bottom_bar
elif (isinstance(bottom_bar, str) or callable(bottom_bar)):
elif isinstance(bottom_bar, str) or callable(bottom_bar):
self._bottom_bar = bottom_bar
else:
raise FalyxError("Bottom bar must be a string, callable, or BottomBar instance.")
raise FalyxError(
"Bottom bar must be a string, callable, or BottomBar instance."
)
self._invalidate_session_cache()
def _get_bottom_bar_render(self) -> Callable[[], Any] | str | None:
@ -414,32 +437,58 @@ class Falyx:
def debug_hooks(self) -> None:
"""Logs the names of all hooks registered for the menu and its commands."""
def hook_names(hook_list):
return [hook.__name__ for hook in hook_list]
logger.debug(f"Menu-level before hooks: {hook_names(self.hooks._hooks[HookType.BEFORE])}")
logger.debug(f"Menu-level success hooks: {hook_names(self.hooks._hooks[HookType.ON_SUCCESS])}")
logger.debug(f"Menu-level error hooks: {hook_names(self.hooks._hooks[HookType.ON_ERROR])}")
logger.debug(f"Menu-level after hooks: {hook_names(self.hooks._hooks[HookType.AFTER])}")
logger.debug(f"Menu-level on_teardown hooks: {hook_names(self.hooks._hooks[HookType.ON_TEARDOWN])}")
logger.debug(
"Menu-level before hooks: "
f"{hook_names(self.hooks._hooks[HookType.BEFORE])}"
)
logger.debug(
f"Menu-level success hooks: {hook_names(self.hooks._hooks[HookType.ON_SUCCESS])}"
)
logger.debug(
f"Menu-level error hooks: {hook_names(self.hooks._hooks[HookType.ON_ERROR])}"
)
logger.debug(
f"Menu-level after hooks: {hook_names(self.hooks._hooks[HookType.AFTER])}"
)
logger.debug(
f"Menu-level on_teardown hooks: {hook_names(self.hooks._hooks[HookType.ON_TEARDOWN])}"
)
for key, command in self.commands.items():
logger.debug(f"[Command '{key}'] before: {hook_names(command.hooks._hooks[HookType.BEFORE])}")
logger.debug(f"[Command '{key}'] success: {hook_names(command.hooks._hooks[HookType.ON_SUCCESS])}")
logger.debug(f"[Command '{key}'] error: {hook_names(command.hooks._hooks[HookType.ON_ERROR])}")
logger.debug(f"[Command '{key}'] after: {hook_names(command.hooks._hooks[HookType.AFTER])}")
logger.debug(f"[Command '{key}'] on_teardown: {hook_names(command.hooks._hooks[HookType.ON_TEARDOWN])}")
logger.debug(
f"[Command '{key}'] before: {hook_names(command.hooks._hooks[HookType.BEFORE])}"
)
logger.debug(
f"[Command '{key}'] success: {hook_names(command.hooks._hooks[HookType.ON_SUCCESS])}"
)
logger.debug(
f"[Command '{key}'] error: {hook_names(command.hooks._hooks[HookType.ON_ERROR])}"
)
logger.debug(
f"[Command '{key}'] after: {hook_names(command.hooks._hooks[HookType.AFTER])}"
)
logger.debug(
f"[Command '{key}'] on_teardown: {hook_names(command.hooks._hooks[HookType.ON_TEARDOWN])}"
)
def is_key_available(self, key: str) -> bool:
key = key.upper()
toggles = self._bottom_bar.toggle_keys if isinstance(self._bottom_bar, BottomBar) else []
toggles = (
self._bottom_bar.toggle_keys
if isinstance(self._bottom_bar, BottomBar)
else []
)
conflicts = (
key in self.commands,
key == self.exit_command.key.upper(),
self.history_command and key == self.history_command.key.upper(),
self.help_command and key == self.help_command.key.upper(),
key in toggles
key in toggles,
)
return not any(conflicts)
@ -447,7 +496,11 @@ class Falyx:
def _validate_command_key(self, key: str) -> None:
"""Validates the command key to ensure it is unique."""
key = key.upper()
toggles = self._bottom_bar.toggle_keys if isinstance(self._bottom_bar, BottomBar) else []
toggles = (
self._bottom_bar.toggle_keys
if isinstance(self._bottom_bar, BottomBar)
else []
)
collisions = []
if key in self.commands:
@ -462,7 +515,9 @@ class Falyx:
collisions.append("toggle")
if collisions:
raise CommandAlreadyExistsError(f"Command key '{key}' conflicts with existing {', '.join(collisions)}.")
raise CommandAlreadyExistsError(
f"Command key '{key}' conflicts with existing {', '.join(collisions)}."
)
def update_exit_command(
self,
@ -486,7 +541,9 @@ class Falyx:
confirm_message=confirm_message,
)
def add_submenu(self, key: str, description: str, submenu: "Falyx", color: str = OneColors.CYAN) -> None:
def add_submenu(
self, key: str, description: str, submenu: "Falyx", color: str = OneColors.CYAN
) -> None:
"""Adds a submenu to the menu."""
if not isinstance(submenu, Falyx):
raise NotAFalyxError("submenu must be an instance of Falyx.")
@ -581,10 +638,16 @@ class Falyx:
"""Returns the bottom row of the table for displaying additional commands."""
bottom_row = []
if self.history_command:
bottom_row.append(f"[{self.history_command.key}] [{self.history_command.color}]{self.history_command.description}")
bottom_row.append(
f"[{self.history_command.key}] [{self.history_command.color}]{self.history_command.description}"
)
if self.help_command:
bottom_row.append(f"[{self.help_command.key}] [{self.help_command.color}]{self.help_command.description}")
bottom_row.append(f"[{self.exit_command.key}] [{self.exit_command.color}]{self.exit_command.description}")
bottom_row.append(
f"[{self.help_command.key}] [{self.help_command.color}]{self.help_command.description}"
)
bottom_row.append(
f"[{self.exit_command.key}] [{self.exit_command.color}]{self.exit_command.description}"
)
return bottom_row
def build_default_table(self) -> Table:
@ -626,13 +689,17 @@ class Falyx:
fuzzy_matches = get_close_matches(choice, list(name_map.keys()), n=3, cutoff=0.7)
if fuzzy_matches:
if not from_validate:
self.console.print(f"[{OneColors.LIGHT_YELLOW}]⚠️ Unknown command '{choice}'. Did you mean:[/] ")
self.console.print(
f"[{OneColors.LIGHT_YELLOW}]⚠️ Unknown command '{choice}'. Did you mean:[/] "
)
for match in fuzzy_matches:
cmd = name_map[match]
self.console.print(f" • [bold]{match}[/] → {cmd.description}")
else:
if not from_validate:
self.console.print(f"[{OneColors.LIGHT_YELLOW}]⚠️ Unknown command '{choice}'[/]")
self.console.print(
f"[{OneColors.LIGHT_YELLOW}]⚠️ Unknown command '{choice}'[/]"
)
return None
async def _should_run_action(self, selected_command: Command) -> bool:
@ -642,9 +709,11 @@ class Falyx:
if self.cli_args and getattr(self.cli_args, "skip_confirm", False):
return True
if (self._always_confirm or
selected_command.confirm or
self.cli_args and getattr(self.cli_args, "force_confirm", False)
if (
self._always_confirm
or selected_command.confirm
or self.cli_args
and getattr(self.cli_args, "force_confirm", False)
):
if selected_command.preview_before_confirm:
await selected_command.preview()
@ -676,11 +745,15 @@ class Falyx:
):
return await command()
async def _handle_action_error(self, selected_command: Command, error: Exception) -> bool:
async def _handle_action_error(
self, selected_command: Command, error: Exception
) -> bool:
"""Handles errors that occur during the action of the selected command."""
logger.exception(f"Error executing '{selected_command.description}': {error}")
self.console.print(f"[{OneColors.DARK_RED}]An error occurred while executing "
f"{selected_command.description}:[/] {error}")
self.console.print(
f"[{OneColors.DARK_RED}]An error occurred while executing "
f"{selected_command.description}:[/] {error}"
)
if self.confirm_on_error and not self._never_confirm:
return await async_confirm("An error occurred. Do you wish to continue?")
if self._never_confirm:
@ -701,7 +774,6 @@ class Falyx:
f"[{OneColors.LIGHT_YELLOW}]⚠️ Command '{selected_command.key}' requires input "
f"and must be run via [{OneColors.MAGENTA}]'{program} run'[{OneColors.LIGHT_YELLOW}] "
"with proper piping or arguments.[/]"
)
return True
@ -730,7 +802,9 @@ class Falyx:
context.exception = error
await self.hooks.trigger(HookType.ON_ERROR, context)
if not context.exception:
logger.info(f"✅ Recovery hook handled error for '{selected_command.description}'")
logger.info(
f"✅ Recovery hook handled error for '{selected_command.description}'"
)
context.result = result
else:
return await self._handle_action_error(selected_command, error)
@ -753,7 +827,9 @@ class Falyx:
logger.info(f"[Headless] 🚀 Running: '{selected_command.description}'")
if not await self._should_run_action(selected_command):
raise FalyxError(f"[Headless] '{selected_command.description}' cancelled by confirmation.")
raise FalyxError(
f"[Headless] '{selected_command.description}' cancelled by confirmation."
)
context = self._create_context(selected_command)
context.start_timer()
@ -769,14 +845,20 @@ class Falyx:
await self.hooks.trigger(HookType.ON_SUCCESS, context)
logger.info(f"[Headless] ✅ '{selected_command.description}' complete.")
except (KeyboardInterrupt, EOFError):
raise FalyxError(f"[Headless] ⚠️ '{selected_command.description}' interrupted by user.")
raise FalyxError(
f"[Headless] ⚠️ '{selected_command.description}' interrupted by user."
)
except Exception as error:
context.exception = error
await self.hooks.trigger(HookType.ON_ERROR, context)
if not context.exception:
logger.info(f"[Headless] ✅ Recovery hook handled error for '{selected_command.description}'")
logger.info(
f"[Headless] ✅ Recovery hook handled error for '{selected_command.description}'"
)
return True
raise FalyxError(f"[Headless] ❌ '{selected_command.description}' failed.") from error
raise FalyxError(
f"[Headless] ❌ '{selected_command.description}' failed."
) from error
finally:
context.stop_timer()
await self.hooks.trigger(HookType.AFTER, context)
@ -787,7 +869,11 @@ class Falyx:
def _set_retry_policy(self, selected_command: Command) -> None:
"""Sets the retry policy for the command based on CLI arguments."""
assert isinstance(self.cli_args, Namespace), "CLI arguments must be provided."
if self.cli_args.retries or self.cli_args.retry_delay or self.cli_args.retry_backoff:
if (
self.cli_args.retries
or self.cli_args.retry_delay
or self.cli_args.retry_backoff
):
selected_command.retry_policy.enabled = True
if self.cli_args.retries:
selected_command.retry_policy.max_retries = self.cli_args.retries
@ -798,7 +884,9 @@ class Falyx:
if isinstance(selected_command.action, Action):
selected_command.action.set_retry_policy(selected_command.retry_policy)
else:
logger.warning(f"[Command:{selected_command.key}] Retry requested, but action is not an Action instance.")
logger.warning(
f"[Command:{selected_command.key}] Retry requested, but action is not an Action instance."
)
def print_message(self, message: str | Markdown | dict[str, Any]) -> None:
"""Prints a message to the console."""
@ -821,6 +909,9 @@ class Falyx:
if self.welcome_message:
self.print_message(self.welcome_message)
while True:
if callable(self.render_menu):
self.render_menu(self)
elif isinstance(self.render_menu, str):
self.console.print(self.table, justify="center")
try:
task = asyncio.create_task(self.process_command())
@ -858,16 +949,22 @@ class Falyx:
if self.cli_args.command == "preview":
command = self.get_command(self.cli_args.name)
if not command:
self.console.print(f"[{OneColors.DARK_RED}]❌ Command '{self.cli_args.name}' not found.[/]")
self.console.print(
f"[{OneColors.DARK_RED}]❌ Command '{self.cli_args.name}' not found.[/]"
)
sys.exit(1)
self.console.print(f"Preview of command '{command.key}': {command.description}")
self.console.print(
f"Preview of command '{command.key}': {command.description}"
)
await command.preview()
sys.exit(0)
if self.cli_args.command == "run":
command = self.get_command(self.cli_args.name)
if not command:
self.console.print(f"[{OneColors.DARK_RED}]❌ Command '{self.cli_args.name}' not found.[/]")
self.console.print(
f"[{OneColors.DARK_RED}]❌ Command '{self.cli_args.name}' not found.[/]"
)
sys.exit(1)
self._set_retry_policy(command)
try:
@ -879,14 +976,19 @@ class Falyx:
if self.cli_args.command == "run-all":
matching = [
cmd for cmd in self.commands.values()
cmd
for cmd in self.commands.values()
if self.cli_args.tag.lower() in (tag.lower() for tag in cmd.tags)
]
if not matching:
self.console.print(f"[{OneColors.LIGHT_YELLOW}]⚠️ No commands found with tag: '{self.cli_args.tag}'[/]")
self.console.print(
f"[{OneColors.LIGHT_YELLOW}]⚠️ No commands found with tag: '{self.cli_args.tag}'[/]"
)
sys.exit(1)
self.console.print(f"[{OneColors.CYAN_b}]🚀 Running all commands with tag:[/] {self.cli_args.tag}")
self.console.print(
f"[{OneColors.CYAN_b}]🚀 Running all commands with tag:[/] {self.cli_args.tag}"
)
for cmd in matching:
self._set_retry_policy(cmd)
await self.headless(cmd.key)

View File

@ -10,13 +10,13 @@ from falyx.context import ExecutionContext
from falyx.utils import logger
Hook = Union[
Callable[[ExecutionContext], None],
Callable[[ExecutionContext], Awaitable[None]]
Callable[[ExecutionContext], None], Callable[[ExecutionContext], Awaitable[None]]
]
class HookType(Enum):
"""Enum for hook types to categorize the hooks."""
BEFORE = "before"
ON_SUCCESS = "on_success"
ON_ERROR = "on_error"
@ -61,10 +61,13 @@ class HookManager:
else:
hook(context)
except Exception as hook_error:
logger.warning(f"⚠️ Hook '{hook.__name__}' raised an exception during '{hook_type}'"
f" for '{context.name}': {hook_error}")
logger.warning(
f"⚠️ Hook '{hook.__name__}' raised an exception during '{hook_type}'"
f" for '{context.name}': {hook_error}"
)
if hook_type == HookType.ON_ERROR:
assert isinstance(context.exception, Exception), "Context exception should be set for ON_ERROR hook"
assert isinstance(
context.exception, Exception
), "Context exception should be set for ON_ERROR hook"
raise context.exception from hook_error

View File

@ -25,9 +25,13 @@ class ResultReporter:
raise TypeError("formatter must be callable")
if context.result is not None:
result_text = self.formatter(context.result)
duration = f"{context.duration:.3f}s" if context.duration is not None else "n/a"
context.console.print(f"[{OneColors.GREEN}]✅ '{context.name}' "
f"completed:[/] {result_text} in {duration}.")
duration = (
f"{context.duration:.3f}s" if context.duration is not None else "n/a"
)
context.console.print(
f"[{OneColors.GREEN}]✅ '{context.name}' "
f"completed:[/] {result_text} in {duration}."
)
class CircuitBreaker:
@ -41,7 +45,9 @@ class CircuitBreaker:
name = context.name
if self.open_until:
if time.time() < self.open_until:
raise CircuitBreakerOpen(f"🔴 Circuit open for '{name}' until {time.ctime(self.open_until)}.")
raise CircuitBreakerOpen(
f"🔴 Circuit open for '{name}' until {time.ctime(self.open_until)}."
)
else:
logger.info(f"🟢 Circuit closed again for '{name}'.")
self.failures = 0
@ -50,10 +56,14 @@ class CircuitBreaker:
def error_hook(self, context: ExecutionContext):
name = context.name
self.failures += 1
logger.warning(f"⚠️ CircuitBreaker: '{name}' failure {self.failures}/{self.max_failures}.")
logger.warning(
f"⚠️ CircuitBreaker: '{name}' failure {self.failures}/{self.max_failures}."
)
if self.failures >= self.max_failures:
self.open_until = time.time() + self.reset_timeout
logger.error(f"🔴 Circuit opened for '{name}' until {time.ctime(self.open_until)}.")
logger.error(
f"🔴 Circuit opened for '{name}' until {time.ctime(self.open_until)}."
)
def after_hook(self, context: ExecutionContext):
self.failures = 0

View File

@ -59,6 +59,7 @@ class HTTPAction(Action):
retry (bool): Enable retry logic.
retry_policy (RetryPolicy): Retry settings.
"""
def __init__(
self,
name: str,

View File

@ -58,6 +58,7 @@ class BaseIOAction(BaseAction):
mode (str): Either "buffered" or "stream". Controls input behavior.
inject_last_result (bool): Whether to inject shared context input.
"""
def __init__(
self,
name: str,
@ -94,7 +95,9 @@ class BaseIOAction(BaseAction):
if self.inject_last_result and self.shared_context:
return self.shared_context.last_result()
logger.debug("[%s] No input provided and no last result found for injection.", self.name)
logger.debug(
"[%s] No input provided and no last result found for injection.", self.name
)
raise FalyxError("No input provided and no last result to inject.")
async def __call__(self, *args, **kwargs):
@ -137,7 +140,6 @@ class BaseIOAction(BaseAction):
return await asyncio.to_thread(sys.stdin.read)
return ""
async def _read_stdin_stream(self) -> Any:
"""Returns a generator that yields lines from stdin in a background thread."""
loop = asyncio.get_running_loop()
@ -176,7 +178,9 @@ class BaseIOAction(BaseAction):
class UppercaseIO(BaseIOAction):
def from_input(self, raw: str | bytes) -> str:
if not isinstance(raw, (str, bytes)):
raise TypeError(f"{self.name} expected str or bytes input, got {type(raw).__name__}")
raise TypeError(
f"{self.name} expected str or bytes input, got {type(raw).__name__}"
)
return raw.strip() if isinstance(raw, str) else raw.decode("utf-8").strip()
async def _run(self, parsed_input: str, *args, **kwargs) -> str:
@ -213,21 +217,22 @@ class ShellAction(BaseIOAction):
command_template (str): Shell command to execute. Must include `{}` to include input.
If no placeholder is present, the input is not included.
"""
def __init__(self, name: str, command_template: str, **kwargs):
super().__init__(name=name, **kwargs)
self.command_template = command_template
def from_input(self, raw: str | bytes) -> str:
if not isinstance(raw, (str, bytes)):
raise TypeError(f"{self.name} expected str or bytes input, got {type(raw).__name__}")
raise TypeError(
f"{self.name} expected str or bytes input, got {type(raw).__name__}"
)
return raw.strip() if isinstance(raw, str) else raw.decode("utf-8").strip()
async def _run(self, parsed_input: str) -> str:
# Replace placeholder in template, or use raw input as full command
command = self.command_template.format(parsed_input)
result = subprocess.run(
command, shell=True, text=True, capture_output=True
)
result = subprocess.run(command, shell=True, text=True, capture_output=True)
if result.returncode != 0:
raise RuntimeError(result.stderr.strip())
return result.stdout.strip()
@ -245,7 +250,10 @@ class ShellAction(BaseIOAction):
console.print(Tree("".join(label)))
def __str__(self):
return f"ShellAction(name={self.name!r}, command_template={self.command_template!r})"
return (
f"ShellAction(name={self.name!r}, command_template={self.command_template!r})"
)
class GrepAction(BaseIOAction):
def __init__(self, name: str, pattern: str, **kwargs):
@ -254,13 +262,19 @@ class GrepAction(BaseIOAction):
def from_input(self, raw: str | bytes) -> str:
if not isinstance(raw, (str, bytes)):
raise TypeError(f"{self.name} expected str or bytes input, got {type(raw).__name__}")
raise TypeError(
f"{self.name} expected str or bytes input, got {type(raw).__name__}"
)
return raw.strip() if isinstance(raw, str) else raw.decode("utf-8").strip()
async def _run(self, parsed_input: str) -> str:
command = ["grep", "-n", self.pattern]
process = subprocess.Popen(
command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
stdout, stderr = process.communicate(input=parsed_input)
if process.returncode == 1:
@ -271,4 +285,3 @@ class GrepAction(BaseIOAction):
def to_output(self, result: str) -> str:
return result

View File

@ -26,9 +26,7 @@ class OptionsManager:
"""Get the value of an option."""
return getattr(self.options[namespace_name], option_name, default)
def set(
self, option_name: str, value: Any, namespace_name: str = "cli_args"
) -> None:
def set(self, option_name: str, value: Any, namespace_name: str = "cli_args") -> None:
"""Set the value of an option."""
setattr(self.options[namespace_name], option_name, value)

View File

@ -10,6 +10,7 @@ from typing import Any, Sequence
@dataclass
class FalyxParsers:
"""Defines the argument parsers for the Falyx CLI."""
root: ArgumentParser
run: ArgumentParser
run_all: ArgumentParser
@ -31,7 +32,7 @@ class FalyxParsers:
def get_arg_parsers(
prog: str |None = "falyx",
prog: str | None = "falyx",
usage: str | None = None,
description: str | None = "Falyx CLI - Run structured async command workflows.",
epilog: str | None = None,
@ -44,7 +45,7 @@ def get_arg_parsers(
add_help: bool = True,
allow_abbrev: bool = True,
exit_on_error: bool = True,
) -> FalyxParsers:
) -> FalyxParsers:
"""Returns the argument parser for the CLI."""
parser = ArgumentParser(
prog=prog,
@ -61,33 +62,87 @@ def get_arg_parsers(
allow_abbrev=allow_abbrev,
exit_on_error=exit_on_error,
)
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging for Falyx.")
parser.add_argument("--debug-hooks", action="store_true", help="Enable default lifecycle debug logging")
parser.add_argument(
"-v", "--verbose", action="store_true", help="Enable debug logging for Falyx."
)
parser.add_argument(
"--debug-hooks",
action="store_true",
help="Enable default lifecycle debug logging",
)
parser.add_argument("--version", action="store_true", help="Show Falyx version")
subparsers = parser.add_subparsers(dest="command")
run_parser = subparsers.add_parser("run", help="Run a specific command")
run_parser.add_argument("name", help="Key, alias, or description of the command")
run_parser.add_argument("--retries", type=int, help="Number of retries on failure", default=0)
run_parser.add_argument("--retry-delay", type=float, help="Initial delay between retries in (seconds)", default=0)
run_parser.add_argument("--retry-backoff", type=float, help="Backoff factor for retries", default=0)
run_parser.add_argument(
"--retries", type=int, help="Number of retries on failure", default=0
)
run_parser.add_argument(
"--retry-delay",
type=float,
help="Initial delay between retries in (seconds)",
default=0,
)
run_parser.add_argument(
"--retry-backoff", type=float, help="Backoff factor for retries", default=0
)
run_group = run_parser.add_mutually_exclusive_group(required=False)
run_group.add_argument("-c", "--confirm", dest="force_confirm", action="store_true", help="Force confirmation prompts")
run_group.add_argument("-s", "--skip-confirm", dest="skip_confirm", action="store_true", help="Skip confirmation prompts")
run_group.add_argument(
"-c",
"--confirm",
dest="force_confirm",
action="store_true",
help="Force confirmation prompts",
)
run_group.add_argument(
"-s",
"--skip-confirm",
dest="skip_confirm",
action="store_true",
help="Skip confirmation prompts",
)
run_all_parser = subparsers.add_parser("run-all", help="Run all commands with a given tag")
run_all_parser = subparsers.add_parser(
"run-all", help="Run all commands with a given tag"
)
run_all_parser.add_argument("-t", "--tag", required=True, help="Tag to match")
run_all_parser.add_argument("--retries", type=int, help="Number of retries on failure", default=0)
run_all_parser.add_argument("--retry-delay", type=float, help="Initial delay between retries in (seconds)", default=0)
run_all_parser.add_argument("--retry-backoff", type=float, help="Backoff factor for retries", default=0)
run_all_parser.add_argument(
"--retries", type=int, help="Number of retries on failure", default=0
)
run_all_parser.add_argument(
"--retry-delay",
type=float,
help="Initial delay between retries in (seconds)",
default=0,
)
run_all_parser.add_argument(
"--retry-backoff", type=float, help="Backoff factor for retries", default=0
)
run_all_group = run_all_parser.add_mutually_exclusive_group(required=False)
run_all_group.add_argument("-c", "--confirm", dest="force_confirm", action="store_true", help="Force confirmation prompts")
run_all_group.add_argument("-s", "--skip-confirm", dest="skip_confirm", action="store_true", help="Skip confirmation prompts")
run_all_group.add_argument(
"-c",
"--confirm",
dest="force_confirm",
action="store_true",
help="Force confirmation prompts",
)
run_all_group.add_argument(
"-s",
"--skip-confirm",
dest="skip_confirm",
action="store_true",
help="Skip confirmation prompts",
)
preview_parser = subparsers.add_parser("preview", help="Preview a command without running it")
preview_parser = subparsers.add_parser(
"preview", help="Preview a command without running it"
)
preview_parser.add_argument("name", help="Key, alias, or description of the command")
list_parser = subparsers.add_parser("list", help="List all available commands with tags")
list_parser = subparsers.add_parser(
"list", help="List all available commands with tags"
)
version_parser = subparsers.add_parser("version", help="Show the Falyx version")

View File

@ -34,15 +34,15 @@ class RetryPolicy(BaseModel):
class RetryHandler:
def __init__(self, policy: RetryPolicy=RetryPolicy()):
def __init__(self, policy: RetryPolicy = RetryPolicy()):
self.policy = policy
def enable_policy(
self,
max_retries: int=3,
delay: float=1.0,
backoff: float=2.0,
jitter: float=0.0,
max_retries: int = 3,
delay: float = 1.0,
backoff: float = 2.0,
jitter: float = 0.0,
):
self.policy.enabled = True
self.policy.max_retries = max_retries
@ -53,6 +53,7 @@ class RetryHandler:
async def retry_on_error(self, context: ExecutionContext):
from falyx.action import Action
name = context.name
error = context.exception
target = context.action
@ -66,7 +67,9 @@ class RetryHandler:
return
if not isinstance(target, Action):
logger.warning(f"[{name}] ❌ RetryHandler only supports only supports Action objects.")
logger.warning(
f"[{name}] ❌ RetryHandler only supports only supports Action objects."
)
return
if not getattr(target, "is_retryable", False):

View File

@ -17,6 +17,7 @@ Example dynamic usage:
console.print("Hello!", style=NordColors.NORD12bu)
# => Renders "Hello!" in #D08770 (Nord12) plus bold and underline styles
"""
import re
from difflib import get_close_matches
@ -82,14 +83,17 @@ class ColorsMeta(type):
except AttributeError:
error_msg = [f"'{cls.__name__}' has no color named '{base}'."]
valid_bases = [
key for key, val in cls.__dict__.items() if isinstance(val, str) and
not key.startswith("__")
key
for key, val in cls.__dict__.items()
if isinstance(val, str) and not key.startswith("__")
]
suggestions = get_close_matches(base, valid_bases, n=1, cutoff=0.5)
if suggestions:
error_msg.append(f"Did you mean '{suggestions[0]}'?")
if valid_bases:
error_msg.append(f"Valid base color names include: {', '.join(valid_bases)}")
error_msg.append(
f"Valid base color names include: {', '.join(valid_bases)}"
)
raise AttributeError(" ".join(error_msg)) from None
if not isinstance(color_value, str):
@ -105,7 +109,9 @@ class ColorsMeta(type):
if mapped_style:
styles.append(mapped_style)
else:
raise AttributeError(f"Unknown style flag '{letter}' in attribute '{name}'")
raise AttributeError(
f"Unknown style flag '{letter}' in attribute '{name}'"
)
order = {"b": 1, "i": 2, "u": 3, "d": 4, "r": 5, "s": 6}
styles_sorted = sorted(styles, key=lambda s: order[s[0]])
@ -133,7 +139,6 @@ class OneColors(metaclass=ColorsMeta):
BLUE = "#61AFEF"
MAGENTA = "#C678DD"
@classmethod
def as_dict(cls):
"""
@ -143,10 +148,10 @@ class OneColors(metaclass=ColorsMeta):
return {
attr: getattr(cls, attr)
for attr in dir(cls)
if not callable(getattr(cls, attr)) and
not attr.startswith("__")
if not callable(getattr(cls, attr)) and not attr.startswith("__")
}
class NordColors(metaclass=ColorsMeta):
"""
Defines the Nord color palette as class attributes.
@ -215,8 +220,7 @@ class NordColors(metaclass=ColorsMeta):
return {
attr: getattr(cls, attr)
for attr in dir(cls)
if attr.startswith("NORD") and
not callable(getattr(cls, attr))
if attr.startswith("NORD") and not callable(getattr(cls, attr))
}
@classmethod
@ -227,7 +231,8 @@ class NordColors(metaclass=ColorsMeta):
"""
skip_prefixes = ("NORD", "__")
alias_names = [
attr for attr in dir(cls)
attr
for attr in dir(cls)
if not any(attr.startswith(sp) for sp in skip_prefixes)
and not callable(getattr(cls, attr))
]
@ -264,7 +269,6 @@ NORD_THEME_STYLES: dict[str, Style] = {
"blink2": Style(blink2=True),
"reverse": Style(reverse=True),
"strike": Style(strike=True),
# ---------------------------------------------------------------
# Basic color names mapped to Nord
# ---------------------------------------------------------------
@ -277,7 +281,6 @@ NORD_THEME_STYLES: dict[str, Style] = {
"cyan": Style(color=NordColors.CYAN),
"blue": Style(color=NordColors.BLUE),
"white": Style(color=NordColors.SNOW_STORM_BRIGHTEST),
# ---------------------------------------------------------------
# Inspect
# ---------------------------------------------------------------
@ -292,14 +295,12 @@ NORD_THEME_STYLES: dict[str, Style] = {
"inspect.help": Style(color=NordColors.FROST_ICE),
"inspect.doc": Style(dim=True),
"inspect.value.border": Style(color=NordColors.GREEN),
# ---------------------------------------------------------------
# Live / Layout
# ---------------------------------------------------------------
"live.ellipsis": Style(bold=True, color=NordColors.RED),
"layout.tree.row": Style(dim=False, color=NordColors.RED),
"layout.tree.column": Style(dim=False, color=NordColors.FROST_DEEP),
# ---------------------------------------------------------------
# Logging
# ---------------------------------------------------------------
@ -314,7 +315,6 @@ NORD_THEME_STYLES: dict[str, Style] = {
"log.time": Style(color=NordColors.FROST_ICE, dim=True),
"log.message": Style.null(),
"log.path": Style(dim=True),
# ---------------------------------------------------------------
# Python repr
# ---------------------------------------------------------------
@ -340,18 +340,18 @@ NORD_THEME_STYLES: dict[str, Style] = {
"repr.bool_true": Style(color=NordColors.GREEN, italic=True),
"repr.bool_false": Style(color=NordColors.RED, italic=True),
"repr.none": Style(color=NordColors.PURPLE, italic=True),
"repr.url": Style(underline=True, color=NordColors.FROST_ICE, italic=False, bold=False),
"repr.url": Style(
underline=True, color=NordColors.FROST_ICE, italic=False, bold=False
),
"repr.uuid": Style(color=NordColors.YELLOW, bold=False),
"repr.call": Style(color=NordColors.PURPLE, bold=True),
"repr.path": Style(color=NordColors.PURPLE),
"repr.filename": Style(color=NordColors.PURPLE),
# ---------------------------------------------------------------
# Rule
# ---------------------------------------------------------------
"rule.line": Style(color=NordColors.GREEN),
"rule.text": Style.null(),
# ---------------------------------------------------------------
# JSON
# ---------------------------------------------------------------
@ -362,7 +362,6 @@ NORD_THEME_STYLES: dict[str, Style] = {
"json.number": Style(color=NordColors.FROST_ICE, bold=True, italic=False),
"json.str": Style(color=NordColors.GREEN, italic=False, bold=False),
"json.key": Style(color=NordColors.FROST_ICE, bold=True),
# ---------------------------------------------------------------
# Prompt
# ---------------------------------------------------------------
@ -371,12 +370,10 @@ NORD_THEME_STYLES: dict[str, Style] = {
"prompt.default": Style(color=NordColors.FROST_ICE, bold=True),
"prompt.invalid": Style(color=NordColors.RED),
"prompt.invalid.choice": Style(color=NordColors.RED),
# ---------------------------------------------------------------
# Pretty
# ---------------------------------------------------------------
"pretty": Style.null(),
# ---------------------------------------------------------------
# Scope
# ---------------------------------------------------------------
@ -384,7 +381,6 @@ NORD_THEME_STYLES: dict[str, Style] = {
"scope.key": Style(color=NordColors.YELLOW, italic=True),
"scope.key.special": Style(color=NordColors.YELLOW, italic=True, dim=True),
"scope.equals": Style(color=NordColors.RED),
# ---------------------------------------------------------------
# Table
# ---------------------------------------------------------------
@ -393,7 +389,6 @@ NORD_THEME_STYLES: dict[str, Style] = {
"table.cell": Style.null(),
"table.title": Style(italic=True),
"table.caption": Style(italic=True, dim=True),
# ---------------------------------------------------------------
# Traceback
# ---------------------------------------------------------------
@ -405,7 +400,6 @@ NORD_THEME_STYLES: dict[str, Style] = {
"traceback.exc_type": Style(color=NordColors.RED, bold=True),
"traceback.exc_value": Style.null(),
"traceback.offset": Style(color=NordColors.RED, bold=True),
# ---------------------------------------------------------------
# Progress bars
# ---------------------------------------------------------------
@ -423,13 +417,11 @@ NORD_THEME_STYLES: dict[str, Style] = {
"progress.data.speed": Style(color=NordColors.RED),
"progress.spinner": Style(color=NordColors.GREEN),
"status.spinner": Style(color=NordColors.GREEN),
# ---------------------------------------------------------------
# Tree
# ---------------------------------------------------------------
"tree": Style(),
"tree.line": Style(),
# ---------------------------------------------------------------
# Markdown
# ---------------------------------------------------------------
@ -438,8 +430,12 @@ NORD_THEME_STYLES: dict[str, Style] = {
"markdown.em": Style(italic=True),
"markdown.emph": Style(italic=True), # For commonmark compatibility
"markdown.strong": Style(bold=True),
"markdown.code": Style(bold=True, color=NordColors.FROST_ICE, bgcolor=NordColors.POLAR_NIGHT_ORIGIN),
"markdown.code_block": Style(color=NordColors.FROST_ICE, bgcolor=NordColors.POLAR_NIGHT_ORIGIN),
"markdown.code": Style(
bold=True, color=NordColors.FROST_ICE, bgcolor=NordColors.POLAR_NIGHT_ORIGIN
),
"markdown.code_block": Style(
color=NordColors.FROST_ICE, bgcolor=NordColors.POLAR_NIGHT_ORIGIN
),
"markdown.block_quote": Style(color=NordColors.PURPLE),
"markdown.list": Style(color=NordColors.FROST_ICE),
"markdown.item": Style(),
@ -457,7 +453,6 @@ NORD_THEME_STYLES: dict[str, Style] = {
"markdown.link": Style(color=NordColors.FROST_ICE),
"markdown.link_url": Style(color=NordColors.FROST_SKY, underline=True),
"markdown.s": Style(strike=True),
# ---------------------------------------------------------------
# ISO8601
# ---------------------------------------------------------------
@ -504,7 +499,9 @@ if __name__ == "__main__":
console.print(f"Caught error: {error}", style="red")
# Demonstrate a traceback style:
console.print("\n8) Raising and displaying a traceback with Nord styling:\n", style="bold")
console.print(
"\n8) Raising and displaying a traceback with Nord styling:\n", style="bold"
)
try:
raise ValueError("Nord test exception!")
except ValueError:

View File

@ -11,8 +11,11 @@ from typing import Any, Awaitable, Callable, TypeVar
import pythonjsonlogger.json
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import (AnyFormattedText, FormattedText,
merge_formatted_text)
from prompt_toolkit.formatted_text import (
AnyFormattedText,
FormattedText,
merge_formatted_text,
)
from rich.logging import RichHandler
from falyx.themes.colors import OneColors
@ -21,6 +24,7 @@ logger = logging.getLogger("falyx")
T = TypeVar("T")
async def _noop(*args, **kwargs):
pass
@ -68,7 +72,9 @@ def chunks(iterator, size):
async def async_confirm(message: AnyFormattedText = "Are you sure?") -> bool:
session: PromptSession = PromptSession()
while True:
merged_message: AnyFormattedText = merge_formatted_text([message, FormattedText([(OneColors.LIGHT_YELLOW_b, " [Y/n] ")])])
merged_message: AnyFormattedText = merge_formatted_text(
[message, FormattedText([(OneColors.LIGHT_YELLOW_b, " [Y/n] ")])]
)
answer: str = (await session.prompt_async(merged_message)).strip().lower()
if answer in ("y", "yes"):
return True
@ -182,7 +188,9 @@ def setup_logging(
elif mode == "json":
console_handler = logging.StreamHandler()
console_handler.setFormatter(
pythonjsonlogger.json.JsonFormatter("%(asctime)s %(name)s %(levelname)s %(message)s")
pythonjsonlogger.json.JsonFormatter(
"%(asctime)s %(name)s %(levelname)s %(message)s"
)
)
else:
raise ValueError(f"Invalid log mode: {mode}")
@ -194,13 +202,17 @@ def setup_logging(
file_handler.setLevel(file_log_level)
if json_log_to_file:
file_handler.setFormatter(
pythonjsonlogger.json.JsonFormatter("%(asctime)s %(name)s %(levelname)s %(message)s")
pythonjsonlogger.json.JsonFormatter(
"%(asctime)s %(name)s %(levelname)s %(message)s"
)
)
else:
file_handler.setFormatter(logging.Formatter(
file_handler.setFormatter(
logging.Formatter(
"%(asctime)s [%(name)s] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
))
datefmt="%Y-%m-%d %H:%M:%S",
)
)
root.addHandler(file_handler)
logging.getLogger("urllib3").setLevel(logging.WARNING)

View File

@ -1 +1 @@
__version__ = "0.1.6"
__version__ = "0.1.7"

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "falyx"
version = "0.1.6"
version = "0.1.7"
description = "Reliable and introspectable async CLI action framework."
authors = ["Roland Thomas Jr <roland@rtj.dev>"]
license = "MIT"
@ -18,6 +18,12 @@ python-json-logger = "^3.3.0"
pytest = "^7.0"
pytest-asyncio = "^0.20"
ruff = "^0.3"
toml = "^0.10"
black = { version = "^25.0", allow-prereleases = true }
mypy = { version = "^1.0", allow-prereleases = true }
isort = { version = "^5.0", allow-prereleases = true }
pytest-cov = "^4.0"
pytest-mock = "^3.0"
[tool.poetry.scripts]
sync-version = "scripts.sync_version:main"

View File

@ -1,8 +1,10 @@
"""scripts/sync_version.py"""
import toml
from pathlib import Path
import toml
def main():
pyproject_path = Path(__file__).parent.parent / "pyproject.toml"
version_path = Path(__file__).parent.parent / "falyx" / "version.py"
@ -13,5 +15,6 @@ def main():
version_path.write_text(f'__version__ = "{version}"\n')
print(f"✅ Synced version: {version}{version_path}")
if __name__ == "__main__":
main()

View File

@ -1,15 +1,17 @@
import pytest
from falyx.action import Action, ChainedAction, LiteralInputAction, FallbackAction
from falyx.execution_registry import ExecutionRegistry as er
from falyx.action import Action, ChainedAction, FallbackAction, LiteralInputAction
from falyx.context import ExecutionContext
from falyx.execution_registry import ExecutionRegistry as er
asyncio_default_fixture_loop_scope = "function"
# --- Helpers ---
async def capturing_hook(context: ExecutionContext):
context.extra["hook_triggered"] = True
# --- Fixtures ---
@pytest.fixture(autouse=True)
def clean_registry():
@ -18,7 +20,6 @@ def clean_registry():
er.clear()
@pytest.mark.asyncio
async def test_action_callable():
"""Test if Action can be created with a callable."""
@ -26,15 +27,22 @@ async def test_action_callable():
result = await action()
assert result == "Hello, World!"
@pytest.mark.asyncio
async def test_action_async_callable():
"""Test if Action can be created with an async callable."""
async def async_callable():
return "Hello, World!"
action = Action("test_action", async_callable)
result = await action()
assert result == "Hello, World!"
assert str(action) == "Action(name='test_action', action=async_callable, args=(), kwargs={}, retry=False)"
assert (
str(action)
== "Action(name='test_action', action=async_callable, args=(), kwargs={}, retry=False)"
)
@pytest.mark.asyncio
async def test_action_non_callable():
@ -42,11 +50,15 @@ async def test_action_non_callable():
with pytest.raises(TypeError):
Action("test_action", 42)
@pytest.mark.asyncio
@pytest.mark.parametrize("return_list, expected", [
@pytest.mark.parametrize(
"return_list, expected",
[
(True, [1, 2, 3]),
(False, 3),
])
],
)
async def test_chained_action_return_modes(return_list, expected):
chain = ChainedAction(
name="Simple Chain",
@ -55,19 +67,23 @@ async def test_chained_action_return_modes(return_list, expected):
Action(name="two", action=lambda: 2),
Action(name="three", action=lambda: 3),
],
return_list=return_list
return_list=return_list,
)
result = await chain()
assert result == expected
@pytest.mark.asyncio
@pytest.mark.parametrize("return_list, auto_inject, expected", [
@pytest.mark.parametrize(
"return_list, auto_inject, expected",
[
(True, True, [1, 2, 3]),
(True, False, [1, 2, 3]),
(False, True, 3),
(False, False, 3),
])
],
)
async def test_chained_action_literals(return_list, auto_inject, expected):
chain = ChainedAction(
name="Literal Chain",
@ -79,6 +95,7 @@ async def test_chained_action_literals(return_list, auto_inject, expected):
result = await chain()
assert result == expected
@pytest.mark.asyncio
async def test_literal_input_action():
"""Test if LiteralInputAction can be created and used."""
@ -88,6 +105,7 @@ async def test_literal_input_action():
assert action.value == "Hello, World!"
assert str(action) == "LiteralInputAction(value='Hello, World!')"
@pytest.mark.asyncio
async def test_fallback_action():
"""Test if FallbackAction can be created and used."""
@ -102,4 +120,3 @@ async def test_fallback_action():
result = await chain()
assert result == "Fallback value"
assert str(action) == "FallbackAction(fallback='Fallback value')"

View File

@ -1,5 +1,6 @@
import pickle
import warnings
import pytest
from falyx.action import ProcessAction
@ -7,17 +8,21 @@ from falyx.execution_registry import ExecutionRegistry as er
# --- Fixtures ---
@pytest.fixture(autouse=True)
def clean_registry():
er.clear()
yield
er.clear()
def slow_add(x, y):
return x + y
# --- Tests ---
@pytest.mark.asyncio
async def test_process_action_executes_correctly():
with warnings.catch_warnings():
@ -27,8 +32,10 @@ async def test_process_action_executes_correctly():
result = await action()
assert result == 5
unpickleable = lambda x: x + 1
@pytest.mark.asyncio
async def test_process_action_rejects_unpickleable():
with warnings.catch_warnings():
@ -37,4 +44,3 @@ async def test_process_action_rejects_unpickleable():
action = ProcessAction(name="proc_fail", func=unpickleable, args=(2,))
with pytest.raises(pickle.PicklingError, match="Can't pickle"):
await action()

View File

@ -6,6 +6,7 @@ from falyx.retry_utils import enable_retries_recursively
asyncio_default_fixture_loop_scope = "function"
# --- Fixtures ---
@pytest.fixture(autouse=True)
def clean_registry():
@ -13,6 +14,7 @@ def clean_registry():
yield
er.clear()
def test_action_enable_retry():
"""Test if Action can be created with retry=True."""
action = Action("test_action", lambda: "Hello, World!", retry=True)

View File

@ -1,16 +1,18 @@
import pytest
from falyx.action import Action, ChainedAction, ActionGroup, FallbackAction
from falyx.action import Action, ActionGroup, ChainedAction, FallbackAction
from falyx.context import ExecutionContext
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.context import ExecutionContext
asyncio_default_fixture_loop_scope = "function"
# --- Helpers ---
async def capturing_hook(context: ExecutionContext):
context.extra["hook_triggered"] = True
# --- Fixtures ---
@pytest.fixture
def hook_manager():
@ -18,29 +20,33 @@ def hook_manager():
hm.register(HookType.BEFORE, capturing_hook)
return hm
@pytest.fixture(autouse=True)
def clean_registry():
er.clear()
yield
er.clear()
# --- Tests ---
@pytest.mark.asyncio
async def test_action_runs_correctly():
async def dummy_action(x: int = 0) -> int: return x + 1
async def dummy_action(x: int = 0) -> int:
return x + 1
sample_action = Action(name="increment", action=dummy_action, kwargs={"x": 5})
result = await sample_action()
assert result == 6
@pytest.mark.asyncio
async def test_action_hook_lifecycle(hook_manager):
async def a1(): return 42
action = Action(
name="hooked",
action=a1,
hooks=hook_manager
)
async def a1():
return 42
action = Action(name="hooked", action=a1, hooks=hook_manager)
await action()
@ -48,28 +54,44 @@ async def test_action_hook_lifecycle(hook_manager):
assert context.name == "hooked"
assert context.extra.get("hook_triggered") is True
@pytest.mark.asyncio
async def test_chained_action_with_result_injection():
async def a1(): return 1
async def a2(last_result): return last_result + 5
async def a3(last_result): return last_result * 2
async def a1():
return 1
async def a2(last_result):
return last_result + 5
async def a3(last_result):
return last_result * 2
actions = [
Action(name="start", action=a1),
Action(name="add_last", action=a2, inject_last_result=True),
Action(name="multiply", action=a3, inject_last_result=True)
Action(name="multiply", action=a3, inject_last_result=True),
]
chain = ChainedAction(name="test_chain", actions=actions, inject_last_result=True, return_list=True)
chain = ChainedAction(
name="test_chain", actions=actions, inject_last_result=True, return_list=True
)
result = await chain()
assert result == [1, 6, 12]
chain = ChainedAction(name="test_chain", actions=actions, inject_last_result=True)
result = await chain()
assert result == 12
@pytest.mark.asyncio
async def test_action_group_runs_in_parallel():
async def a1(): return 1
async def a2(): return 2
async def a3(): return 3
async def a1():
return 1
async def a2():
return 2
async def a3():
return 3
actions = [
Action(name="a", action=a1),
Action(name="b", action=a2),
@ -80,10 +102,15 @@ async def test_action_group_runs_in_parallel():
result_dict = dict(result)
assert result_dict == {"a": 1, "b": 2, "c": 3}
@pytest.mark.asyncio
async def test_chained_action_inject_from_action():
async def a1(last_result): return last_result + 10
async def a2(last_result): return last_result + 5
async def a1(last_result):
return last_result + 10
async def a2(last_result):
return last_result + 5
inner_chain = ChainedAction(
name="inner_chain",
actions=[
@ -92,8 +119,13 @@ async def test_chained_action_inject_from_action():
],
return_list=True,
)
async def a3(): return 1
async def a4(last_result): return last_result + 2
async def a3():
return 1
async def a4(last_result):
return last_result + 2
actions = [
Action(name="first", action=a3),
Action(name="second", action=a4, inject_last_result=True),
@ -103,21 +135,33 @@ async def test_chained_action_inject_from_action():
result = await outer_chain()
assert result == [1, 3, [13, 18]]
@pytest.mark.asyncio
async def test_chained_action_with_group():
async def a1(last_result): return last_result + 1
async def a2(last_result): return last_result + 2
async def a3(): return 3
async def a1(last_result):
return last_result + 1
async def a2(last_result):
return last_result + 2
async def a3():
return 3
group = ActionGroup(
name="group",
actions=[
Action(name="a", action=a1, inject_last_result=True),
Action(name="b", action=a2, inject_last_result=True),
Action(name="c", action=a3),
]
],
)
async def a4(): return 1
async def a5(last_result): return last_result + 2
async def a4():
return 1
async def a5(last_result):
return last_result + 2
actions = [
Action(name="first", action=a4),
Action(name="second", action=a5, inject_last_result=True),
@ -127,6 +171,7 @@ async def test_chained_action_with_group():
result = await chain()
assert result == [1, 3, [("a", 4), ("b", 5), ("c", 3)]]
@pytest.mark.asyncio
async def test_action_error_triggers_error_hook():
def fail():
@ -146,6 +191,7 @@ async def test_action_error_triggers_error_hook():
assert flag.get("called") is True
@pytest.mark.asyncio
async def test_chained_action_rollback_on_failure():
rollback_called = []
@ -161,7 +207,7 @@ async def test_chained_action_rollback_on_failure():
actions = [
Action(name="ok", action=success, rollback=rollback_fn),
Action(name="fail", action=fail, rollback=rollback_fn)
Action(name="fail", action=fail, rollback=rollback_fn),
]
chain = ChainedAction(name="chain", actions=actions)
@ -171,13 +217,17 @@ async def test_chained_action_rollback_on_failure():
assert rollback_called == ["rolled back"]
@pytest.mark.asyncio
async def test_register_hooks_recursively_propagates():
def hook(context):
context.extra.update({"test_marker": True})
async def a1(): return 1
async def a2(): return 2
async def a1():
return 1
async def a2():
return 2
chain = ChainedAction(
name="chain",
@ -193,6 +243,7 @@ async def test_register_hooks_recursively_propagates():
for ctx in er.get_by_name("a") + er.get_by_name("b"):
assert ctx.extra.get("test_marker") is True
@pytest.mark.asyncio
async def test_action_hook_recovers_error():
async def flaky():
@ -209,15 +260,26 @@ async def test_action_hook_recovers_error():
result = await action()
assert result == 99
@pytest.mark.asyncio
async def test_action_group_injects_last_result():
async def a1(last_result): return last_result + 10
async def a2(last_result): return last_result + 20
group = ActionGroup(name="group", actions=[
async def a1(last_result):
return last_result + 10
async def a2(last_result):
return last_result + 20
group = ActionGroup(
name="group",
actions=[
Action(name="g1", action=a1, inject_last_result=True),
Action(name="g2", action=a2, inject_last_result=True),
])
async def a3(): return 5
],
)
async def a3():
return 5
chain = ChainedAction(
name="with_group",
actions=[
@ -230,20 +292,30 @@ async def test_action_group_injects_last_result():
result_dict = dict(result[1])
assert result_dict == {"g1": 15, "g2": 25}
@pytest.mark.asyncio
async def test_action_inject_last_result():
async def a1(): return 1
async def a2(last_result): return last_result + 1
async def a1():
return 1
async def a2(last_result):
return last_result + 1
a1 = Action(name="a1", action=a1)
a2 = Action(name="a2", action=a2, inject_last_result=True)
chain = ChainedAction(name="chain", actions=[a1, a2])
result = await chain()
assert result == 2
@pytest.mark.asyncio
async def test_action_inject_last_result_fail():
async def a1(): return 1
async def a2(last_result): return last_result + 1
async def a1():
return 1
async def a2(last_result):
return last_result + 1
a1 = Action(name="a1", action=a1)
a2 = Action(name="a2", action=a2)
chain = ChainedAction(name="chain", actions=[a1, a2])
@ -253,54 +325,82 @@ async def test_action_inject_last_result_fail():
assert "last_result" in str(exc_info.value)
@pytest.mark.asyncio
async def test_chained_action_auto_inject():
async def a1(): return 1
async def a2(last_result): return last_result + 2
async def a1():
return 1
async def a2(last_result):
return last_result + 2
a1 = Action(name="a1", action=a1)
a2 = Action(name="a2", action=a2)
chain = ChainedAction(name="chain", actions=[a1, a2], auto_inject=True, return_list=True)
chain = ChainedAction(
name="chain", actions=[a1, a2], auto_inject=True, return_list=True
)
result = await chain()
assert result == [1, 3] # a2 receives last_result=1
@pytest.mark.asyncio
async def test_chained_action_no_auto_inject():
async def a1(): return 1
async def a2(): return 2
async def a1():
return 1
async def a2():
return 2
a1 = Action(name="a1", action=a1)
a2 = Action(name="a2", action=a2)
chain = ChainedAction(name="no_inject", actions=[a1, a2], auto_inject=False, return_list=True)
chain = ChainedAction(
name="no_inject", actions=[a1, a2], auto_inject=False, return_list=True
)
result = await chain()
assert result == [1, 2] # a2 does not receive 1
@pytest.mark.asyncio
async def test_chained_action_auto_inject_after_first():
async def a1(): return 1
async def a2(last_result): return last_result + 1
async def a1():
return 1
async def a2(last_result):
return last_result + 1
a1 = Action(name="a1", action=a1)
a2 = Action(name="a2", action=a2)
chain = ChainedAction(name="auto_inject", actions=[a1, a2], auto_inject=True)
result = await chain()
assert result == 2 # a2 receives last_result=1
@pytest.mark.asyncio
async def test_chained_action_with_literal_input():
async def a1(last_result): return last_result + " world"
async def a1(last_result):
return last_result + " world"
a1 = Action(name="a1", action=a1)
chain = ChainedAction(name="literal_inject", actions=["hello", a1], auto_inject=True)
result = await chain()
assert result == "hello world" # "hello" is injected as last_result
@pytest.mark.asyncio
async def test_chained_action_manual_inject_override():
async def a1(): return 10
async def a2(last_result): return last_result * 2
async def a1():
return 10
async def a2(last_result):
return last_result * 2
a1 = Action(name="a1", action=a1)
a2 = Action(name="a2", action=a2, inject_last_result=True)
chain = ChainedAction(name="manual_override", actions=[a1, a2], auto_inject=False)
result = await chain()
assert result == 20 # Even without auto_inject, a2 still gets last_result
@pytest.mark.asyncio
async def test_chained_action_with_mid_literal():
async def fetch_data():
@ -330,6 +430,7 @@ async def test_chained_action_with_mid_literal():
result = await chain()
assert result == [None, "default_value", "default_value", "Enriched: default_value"]
@pytest.mark.asyncio
async def test_chained_action_with_mid_fallback():
async def fetch_data():
@ -389,15 +490,22 @@ async def test_chained_action_with_success_mid_fallback():
result = await chain()
assert result == ["Result", "Result", "Result", "Enriched: Result"]
@pytest.mark.asyncio
async def test_action_group_partial_failure():
async def succeed(): return "ok"
async def fail(): raise ValueError("oops")
async def succeed():
return "ok"
group = ActionGroup(name="partial_group", actions=[
async def fail():
raise ValueError("oops")
group = ActionGroup(
name="partial_group",
actions=[
Action(name="succeed_action", action=succeed),
Action(name="fail_action", action=fail),
])
],
)
with pytest.raises(Exception) as exc_info:
await group()
@ -406,10 +514,15 @@ async def test_action_group_partial_failure():
assert er.get_by_name("fail_action")[0].exception is not None
assert "fail_action" in str(exc_info.value)
@pytest.mark.asyncio
async def test_chained_action_with_nested_group():
async def g1(last_result): return last_result + "10"
async def g2(last_result): return last_result + "20"
async def g1(last_result):
return last_result + "10"
async def g2(last_result):
return last_result + "20"
group = ActionGroup(
name="nested_group",
actions=[
@ -431,7 +544,11 @@ async def test_chained_action_with_nested_group():
result = await chain()
# "start" -> group both receive "start" as last_result
assert result[0] == "start"
assert dict(result[1]) == {"g1": "start10", "g2": "start20"} # Assuming string concatenation for example
assert dict(result[1]) == {
"g1": "start10",
"g2": "start20",
} # Assuming string concatenation for example
@pytest.mark.asyncio
async def test_chained_action_double_fallback():
@ -461,5 +578,11 @@ async def test_chained_action_double_fallback():
)
result = await chain()
assert result == [None, "default1", "default1", None, "default2", "Enriched: default2"]
assert result == [
None,
"default1",
"default1",
None,
"default2",
"Enriched: default2",
]

View File

@ -3,6 +3,7 @@ import pytest
from falyx.action import ChainedAction
from falyx.exceptions import EmptyChainError
@pytest.mark.asyncio
async def test_chained_action_raises_empty_chain_error_when_no_actions():
"""A ChainedAction with no actions should raise an EmptyChainError immediately."""
@ -14,6 +15,7 @@ async def test_chained_action_raises_empty_chain_error_when_no_actions():
assert "No actions to execute." in str(exc_info.value)
assert "empty_chain" in str(exc_info.value)
@pytest.mark.asyncio
async def test_chained_action_raises_empty_chain_error_when_actions_are_none():
"""A ChainedAction with None as actions should raise an EmptyChainError immediately."""
@ -24,4 +26,3 @@ async def test_chained_action_raises_empty_chain_error_when_actions_are_none():
assert "No actions to execute." in str(exc_info.value)
assert "none_chain" in str(exc_info.value)

View File

@ -3,12 +3,13 @@ import pytest
from falyx.action import Action, ActionGroup, ChainedAction
from falyx.command import Command
from falyx.io_action import BaseIOAction
from falyx.execution_registry import ExecutionRegistry as er
from falyx.io_action import BaseIOAction
from falyx.retry import RetryPolicy
asyncio_default_fixture_loop_scope = "function"
# --- Fixtures ---
@pytest.fixture(autouse=True)
def clean_registry():
@ -16,10 +17,12 @@ def clean_registry():
yield
er.clear()
# --- Dummy Action ---
async def dummy_action():
return "ok"
# --- Dummy IO Action ---
class DummyInputAction(BaseIOAction):
async def _run(self, *args, **kwargs):
@ -28,46 +31,46 @@ class DummyInputAction(BaseIOAction):
async def preview(self, parent=None):
pass
# --- Tests ---
def test_command_creation():
"""Test if Command can be created with a callable."""
action = Action("test_action", dummy_action)
cmd = Command(
key="TEST",
description="Test Command",
action=action
)
cmd = Command(key="TEST", description="Test Command", action=action)
assert cmd.key == "TEST"
assert cmd.description == "Test Command"
assert cmd.action == action
def test_command_str():
"""Test if Command string representation is correct."""
action = Action("test_action", dummy_action)
cmd = Command(
key="TEST",
description="Test Command",
action=action
)
cmd = Command(key="TEST", description="Test Command", action=action)
print(cmd)
assert str(cmd) == "Command(key='TEST', description='Test Command' action='Action(name='test_action', action=dummy_action, args=(), kwargs={}, retry=False)')"
assert (
str(cmd)
== "Command(key='TEST', description='Test Command' action='Action(name='test_action', action=dummy_action, args=(), kwargs={}, retry=False)')"
)
@pytest.mark.parametrize(
"action_factory, expected_requires_input",
[
(lambda: Action(name="normal", action=dummy_action), False),
(lambda: DummyInputAction(name="io"), True),
(lambda: ChainedAction(name="chain", actions=[DummyInputAction(name="io")]), True),
(lambda: ActionGroup(name="group", actions=[DummyInputAction(name="io")]), True),
]
(
lambda: ChainedAction(name="chain", actions=[DummyInputAction(name="io")]),
True,
),
(
lambda: ActionGroup(name="group", actions=[DummyInputAction(name="io")]),
True,
),
],
)
def test_command_requires_input_detection(action_factory, expected_requires_input):
action = action_factory()
cmd = Command(
key="TEST",
description="Test Command",
action=action
)
cmd = Command(key="TEST", description="Test Command", action=action)
assert cmd.requires_input == expected_requires_input
if expected_requires_input:
@ -75,6 +78,7 @@ def test_command_requires_input_detection(action_factory, expected_requires_inpu
else:
assert cmd.hidden is False
def test_requires_input_flag_detected_for_baseioaction():
"""Command should automatically detect requires_input=True for BaseIOAction."""
cmd = Command(
@ -85,6 +89,7 @@ def test_requires_input_flag_detected_for_baseioaction():
assert cmd.requires_input is True
assert cmd.hidden is True
def test_requires_input_manual_override():
"""Command manually set requires_input=False should not auto-hide."""
cmd = Command(
@ -96,6 +101,7 @@ def test_requires_input_manual_override():
assert cmd.requires_input is False
assert cmd.hidden is False
def test_default_command_does_not_require_input():
"""Normal Command without IO Action should not require input."""
cmd = Command(
@ -106,6 +112,7 @@ def test_default_command_does_not_require_input():
assert cmd.requires_input is False
assert cmd.hidden is False
def test_chain_requires_input():
"""If first action in a chain requires input, the command should require input."""
chain = ChainedAction(
@ -123,6 +130,7 @@ def test_chain_requires_input():
assert cmd.requires_input is True
assert cmd.hidden is True
def test_group_requires_input():
"""If any action in a group requires input, the command should require input."""
group = ActionGroup(
@ -155,6 +163,7 @@ def test_enable_retry():
assert cmd.retry is True
assert cmd.action.retry_policy.enabled is True
def test_enable_retry_with_retry_policy():
"""Command should enable retry if action is an Action and retry_policy is set."""
retry_policy = RetryPolicy(
@ -175,6 +184,7 @@ def test_enable_retry_with_retry_policy():
assert cmd.action.retry_policy.enabled is True
assert cmd.action.retry_policy == retry_policy
def test_enable_retry_not_action():
"""Command should not enable retry if action is not an Action."""
cmd = Command(
@ -188,6 +198,7 @@ def test_enable_retry_not_action():
assert cmd.action.retry_policy.enabled is False
assert "'function' object has no attribute 'retry_policy'" in str(exc_info.value)
def test_chain_retry_all():
"""retry_all should retry all Actions inside a ChainedAction recursively."""
chain = ChainedAction(
@ -209,6 +220,7 @@ def test_chain_retry_all():
assert chain.actions[0].retry_policy.enabled is True
assert chain.actions[1].retry_policy.enabled is True
def test_chain_retry_all_not_base_action():
"""retry_all should not be set if action is not a ChainedAction."""
cmd = Command(
@ -221,4 +233,3 @@ def test_chain_retry_all_not_base_action():
with pytest.raises(Exception) as exc_info:
assert cmd.action.retry_policy.enabled is False
assert "'function' object has no attribute 'retry_policy'" in str(exc_info.value)

View File

@ -1,9 +1,11 @@
import pytest
import asyncio
from falyx.action import Action, ChainedAction, ActionGroup, FallbackAction
import pytest
from falyx.action import Action, ActionGroup, ChainedAction, FallbackAction
from falyx.context import ExecutionContext
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookType
from falyx.context import ExecutionContext
# --- Fixtures ---