Add retry_utils, update docstrings, add tests

This commit is contained in:
Roland Thomas Jr 2025-05-01 00:35:36 -04:00
parent b9529d85ce
commit 4b1a9ef718
Signed by: roland
GPG Key ID: 7C3C2B085A4C2872
10 changed files with 205 additions and 52 deletions

View File

@ -123,31 +123,14 @@ class BaseAction(ABC):
"""Register a hook for all actions and sub-actions."""
self.hooks.register(hook_type, hook)
@classmethod
def enable_retries_recursively(cls, action: BaseAction, policy: RetryPolicy | None):
if not policy:
policy = RetryPolicy(enabled=True)
if isinstance(action, Action):
action.retry_policy = policy
action.retry_policy.enabled = True
action.hooks.register(HookType.ON_ERROR, RetryHandler(policy).retry_on_error)
if hasattr(action, "actions"):
for sub in action.actions:
cls.enable_retries_recursively(sub, policy)
async def _write_stdout(self, data: str) -> None:
"""Override in subclasses that produce terminal output."""
pass
def requires_io_injection(self) -> bool:
"""Checks to see if the action requires input injection."""
return self._requires_injection
def __str__(self):
return f"{self.__class__.__name__}('{self.name}')"
def __repr__(self):
def __repr__(self) -> str:
return str(self)
@ -170,7 +153,7 @@ class Action(BaseAction):
hooks (HookManager, optional): Hook manager for lifecycle events.
inject_last_result (bool, optional): Enable last_result injection.
inject_last_result_as (str, optional): Name of injected key.
retry (bool, optional): Whether to enable retries.
retry (bool, optional): Enable retry logic.
retry_policy (RetryPolicy, optional): Retry settings.
"""
def __init__(
@ -207,7 +190,7 @@ class Action(BaseAction):
def enable_retry(self):
"""Enable retry with the existing retry policy."""
self.retry_policy.enable_policy()
logger.debug(f"[Action:{self.name}] Registering retry handler")
logger.debug("[%s] Registering retry handler", self.name)
handler = RetryHandler(self.retry_policy)
self.hooks.register(HookType.ON_ERROR, handler.retry_on_error)
@ -263,7 +246,10 @@ class Action(BaseAction):
self.console.print(Tree("".join(label)))
def __str__(self):
return f"Action(name={self.name}, action={self.action.__name__})"
return (
f"Action(name={self.name!r}, action={getattr(self._action, '__name__', repr(self._action))}, "
f"args={self.args!r}, kwargs={self.kwargs!r}, retry={self.retry_policy.enabled})"
)
class LiteralInputAction(Action):
@ -290,7 +276,7 @@ class LiteralInputAction(Action):
return self._value
def __str__(self) -> str:
return f"LiteralInputAction(value={self.value})"
return f"LiteralInputAction(value={self.value!r})"
class FallbackAction(Action):
@ -319,7 +305,7 @@ class FallbackAction(Action):
return self._fallback
def __str__(self) -> str:
return f"FallbackAction(fallback={self.fallback})"
return f"FallbackAction(fallback={self.fallback!r})"
class ActionListMixin:
@ -504,7 +490,10 @@ class ChainedAction(BaseAction, ActionListMixin):
action.register_hooks_recursively(hook_type, hook)
def __str__(self):
return f"ChainedAction(name={self.name}, actions={self.actions})"
return (
f"ChainedAction(name={self.name!r}, actions={[a.name for a in self.actions]!r}, "
f"auto_inject={self.auto_inject}, return_list={self.return_list})"
)
class ActionGroup(BaseAction, ActionListMixin):
@ -619,7 +608,10 @@ class ActionGroup(BaseAction, ActionListMixin):
action.register_hooks_recursively(hook_type, hook)
def __str__(self):
return f"ActionGroup(name={self.name}, actions={self.actions})"
return (
f"ActionGroup(name={self.name!r}, actions={[a.name for a in self.actions]!r}, "
f"inject_last_result={self.inject_last_result})"
)
class ProcessAction(BaseAction):
@ -716,3 +708,8 @@ class ProcessAction(BaseAction):
except (pickle.PicklingError, TypeError):
return False
def __str__(self) -> str:
return (
f"ProcessAction(name={self.name!r}, func={getattr(self.func, '__name__', repr(self.func))}, "
f"args={self.args!r}, kwargs={self.kwargs!r})"
)

View File

@ -33,6 +33,7 @@ from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.io_action import BaseIOAction
from falyx.retry import RetryPolicy
from falyx.retry_utils import enable_retries_recursively
from falyx.themes.colors import OneColors
from falyx.utils import _noop, ensure_async, logger
@ -129,7 +130,7 @@ class Command(BaseModel):
logger.warning(f"[Command:{self.key}] Retry requested, but action is not an Action instance.")
if self.retry_all and isinstance(self.action, BaseAction):
self.retry_policy.enabled = True
self.action.enable_retries_recursively(self.action, self.retry_policy)
enable_retries_recursively(self.action, self.retry_policy)
elif self.retry_all:
logger.warning(f"[Command:{self.key}] Retry all requested, but action is not a BaseAction instance.")

View File

@ -9,7 +9,6 @@ from falyx.themes.colors import OneColors
from falyx.utils import logger
class ResultReporter:
def __init__(self, formatter: Callable[[], str] | None = None):
"""

View File

@ -1,3 +1,13 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""http_action.py
Defines an Action subclass for making HTTP requests using aiohttp within Falyx workflows.
Features:
- Automatic reuse of aiohttp.ClientSession via SharedContext
- JSON, query param, header, and body support
- Retry integration and last_result injection
- Clean resource teardown using hooks
"""
from typing import Any
import aiohttp
@ -22,10 +32,32 @@ async def close_shared_http_session(context: ExecutionContext) -> None:
class HTTPAction(Action):
"""
Specialized Action that performs an HTTP request using aiohttp and the shared context.
An Action for executing HTTP requests using aiohttp with shared session reuse.
Automatically reuses a shared aiohttp.ClientSession stored in SharedContext.
Closes the session at the end of the ActionGroup (via an after-hook).
This action integrates seamlessly into Falyx pipelines, with automatic session management,
result injection, and lifecycle hook support. It is ideal for CLI-driven API workflows
where you need to call remote services and process their responses.
Features:
- Uses aiohttp for asynchronous HTTP requests
- Reuses a shared session via SharedContext to reduce connection overhead
- Automatically closes the session at the end of an ActionGroup (if applicable)
- Supports GET, POST, PUT, DELETE, etc. with full header, query, body support
- Retry and result injection compatible
Args:
name (str): Name of the action.
method (str): HTTP method (e.g., 'GET', 'POST').
url (str): The request URL.
headers (dict[str, str], optional): Request headers.
params (dict[str, Any], optional): URL query parameters.
json (dict[str, Any], optional): JSON body to send.
data (Any, optional): Raw data or form-encoded body.
hooks (HookManager, optional): Hook manager for lifecycle events.
inject_last_result (bool): Enable last_result injection.
inject_last_result_as (str): Name of injected key.
retry (bool): Enable retry logic.
retry_policy (RetryPolicy): Retry settings.
"""
def __init__(
self,
@ -107,3 +139,10 @@ class HTTPAction(Action):
parent.add("".join(label))
else:
self.console.print(Tree("".join(label)))
def __str__(self):
return (
f"HTTPAction(name={self.name!r}, method={self.method!r}, url={self.url!r}, "
f"headers={self.headers!r}, params={self.params!r}, json={self.json!r}, data={self.data!r}, "
f"retry={self.retry_policy.enabled}, inject_last_result={self.inject_last_result})"
)

View File

@ -1,5 +1,20 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""io_action.py"""
"""io_action.py
BaseIOAction: A base class for stream- or buffer-based IO-driven Actions.
This module defines `BaseIOAction`, a specialized variant of `BaseAction`
that interacts with standard input and output, enabling command-line pipelines,
text filters, and stream processing tasks.
Features:
- Supports buffered or streaming input modes.
- Reads from stdin and writes to stdout automatically.
- Integrates with lifecycle hooks and retry logic.
- Subclasses must implement `from_input()`, `to_output()`, and `_run()`.
Common usage includes shell-like filters, input transformers, or any tool that
needs to consume input from another process or pipeline.
"""
import asyncio
import subprocess
import sys
@ -20,6 +35,29 @@ console = Console()
class BaseIOAction(BaseAction):
"""
Base class for IO-driven Actions that operate on stdin/stdout input streams.
Designed for use in shell pipelines or programmatic workflows that pass data
through chained commands. It handles reading input, transforming it, and
emitting output either as a one-time buffered operation or line-by-line streaming.
Core responsibilities:
- Reads input from stdin or previous action result.
- Supports buffered or streaming modes via `mode`.
- Parses input via `from_input()` and formats output via `to_output()`.
- Executes `_run()` with the parsed input.
- Writes output to stdout.
Subclasses must implement:
- `from_input(raw)`: Convert raw stdin or injected data into typed input.
- `to_output(data)`: Convert result into output string or bytes.
- `_run(parsed_input, *args, **kwargs)`: Core execution logic.
Attributes:
mode (str): Either "buffered" or "stream". Controls input behavior.
inject_last_result (bool): Whether to inject shared context input.
"""
def __init__(
self,
name: str,
@ -53,8 +91,8 @@ class BaseIOAction(BaseAction):
if last_result is not None:
return last_result
if self.inject_last_result and self.results_context:
return self.results_context.last_result()
if self.inject_last_result and self.shared_context:
return self.shared_context.last_result()
logger.debug("[%s] No input provided and no last result found for injection.", self.name)
raise FalyxError("No input provided and no last result to inject.")
@ -149,6 +187,32 @@ class UppercaseIO(BaseIOAction):
class ShellAction(BaseIOAction):
"""
ShellAction wraps a shell command template for CLI pipelines.
This Action takes parsed input (from stdin, literal, or last_result),
substitutes it into the provided shell command template, and executes
the command asynchronously using subprocess.
Designed for quick integration with shell tools like `grep`, `ping`, `jq`, etc.
Warning:
Be cautious when using ShellAction with untrusted user input. Since it uses
`shell=True`, unsanitized input can lead to command injection vulnerabilities.
Avoid passing raw user input directly unless the template or use case is secure.
Features:
- Automatically handles input parsing (str/bytes)
- Captures stdout and stderr from shell execution
- Raises on non-zero exit codes with stderr as the error
- Result is returned as trimmed stdout string
- Compatible with ChainedAction and Command.requires_input detection
Args:
name (str): Name of the action.
command_template (str): Shell command to execute. Must include `{}` to include input.
If no placeholder is present, the input is not included.
"""
def __init__(self, name: str, command_template: str, **kwargs):
super().__init__(name=name, **kwargs)
self.command_template = command_template
@ -159,7 +223,7 @@ class ShellAction(BaseIOAction):
return raw.strip() if isinstance(raw, str) else raw.decode("utf-8").strip()
async def _run(self, parsed_input: str) -> str:
# Replace placeholder in template, or use raw input ddas full command
# Replace placeholder in template, or use raw input as full command
command = self.command_template.format(parsed_input)
result = subprocess.run(
command, shell=True, text=True, capture_output=True
@ -180,6 +244,9 @@ class ShellAction(BaseIOAction):
else:
console.print(Tree("".join(label)))
def __str__(self):
return f"ShellAction(name={self.name!r}, command_template={self.command_template!r})"
class GrepAction(BaseIOAction):
def __init__(self, name: str, pattern: str, **kwargs):
super().__init__(name=name, **kwargs)

View File

@ -1,5 +1,7 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""retry.py"""
from __future__ import annotations
import asyncio
import random

16
falyx/retry_utils.py Normal file
View File

@ -0,0 +1,16 @@
from falyx.action import Action, BaseAction
from falyx.hook_manager import HookType
from falyx.retry import RetryHandler, RetryPolicy
def enable_retries_recursively(action: BaseAction, policy: RetryPolicy | None):
if not policy:
policy = RetryPolicy(enabled=True)
if isinstance(action, Action):
action.retry_policy = policy
action.retry_policy.enabled = True
action.hooks.register(HookType.ON_ERROR, RetryHandler(policy).retry_on_error)
if hasattr(action, "actions"):
for sub in action.actions:
enable_retries_recursively(sub, policy)

View File

@ -1,8 +1,7 @@
import pytest
from falyx.action import Action, ChainedAction, ActionGroup, FallbackAction
from falyx.action import Action, ChainedAction, LiteralInputAction, FallbackAction
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.context import ExecutionContext
asyncio_default_fixture_loop_scope = "function"
@ -18,6 +17,8 @@ def clean_registry():
yield
er.clear()
@pytest.mark.asyncio
async def test_action_callable():
"""Test if Action can be created with a callable."""
@ -33,6 +34,7 @@ async def test_action_async_callable():
action = Action("test_action", async_callable)
result = await action()
assert result == "Hello, World!"
assert str(action) == "Action(name='test_action', action=async_callable, args=(), kwargs={}, retry=False)"
@pytest.mark.asyncio
async def test_action_non_callable():
@ -76,3 +78,28 @@ async def test_chained_action_literals(return_list, auto_inject, expected):
result = await chain()
assert result == expected
@pytest.mark.asyncio
async def test_literal_input_action():
"""Test if LiteralInputAction can be created and used."""
action = LiteralInputAction("Hello, World!")
result = await action()
assert result == "Hello, World!"
assert action.value == "Hello, World!"
assert str(action) == "LiteralInputAction(value='Hello, World!')"
@pytest.mark.asyncio
async def test_fallback_action():
"""Test if FallbackAction can be created and used."""
action = FallbackAction("Fallback value")
chain = ChainedAction(
name="Fallback Chain",
actions=[
Action(name="one", action=lambda: None),
action,
],
)
result = await chain()
assert result == "Fallback value"
assert str(action) == "FallbackAction(fallback='Fallback value')"

View File

@ -1,23 +1,12 @@
import pytest
from falyx.action import Action, ChainedAction, ActionGroup, FallbackAction
from falyx.action import Action, ChainedAction
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.context import ExecutionContext
from falyx.retry_utils import enable_retries_recursively
asyncio_default_fixture_loop_scope = "function"
# --- Helpers ---
async def capturing_hook(context: ExecutionContext):
context.extra["hook_triggered"] = True
# --- Fixtures ---
@pytest.fixture
def hook_manager():
hm = HookManager()
hm.register(HookType.BEFORE, capturing_hook)
return hm
@pytest.fixture(autouse=True)
def clean_registry():
er.clear()
@ -28,3 +17,18 @@ def test_action_enable_retry():
"""Test if Action can be created with retry=True."""
action = Action("test_action", lambda: "Hello, World!", retry=True)
assert action.retry_policy.enabled is True
@pytest.mark.asyncio
async def test_enable_retries_recursively():
"""Test if Action can be created with retry=True."""
action = Action("test_action", lambda: "Hello, World!")
assert action.retry_policy.enabled is False
chained_action = ChainedAction(
name="Chained Action",
actions=[action],
)
enable_retries_recursively(chained_action, policy=None)
assert action.retry_policy.enabled is True

View File

@ -49,7 +49,8 @@ def test_command_str():
description="Test Command",
action=action
)
assert str(cmd) == "Command(key='TEST', description='Test Command' action='Action(name=test_action, action=dummy_action)')"
print(cmd)
assert str(cmd) == "Command(key='TEST', description='Test Command' action='Action(name='test_action', action=dummy_action, args=(), kwargs={}, retry=False)')"
@pytest.mark.parametrize(
"action_factory, expected_requires_input",