7 Commits

32 changed files with 861 additions and 301 deletions

View File

@ -0,0 +1,39 @@
import asyncio
from falyx import Action, ActionGroup, Command, Falyx
# Define a shared async function
async def say_hello(name: str, excited: bool = False):
if excited:
print(f"Hello, {name}!!!")
else:
print(f"Hello, {name}.")
# Wrap the same callable in multiple Actions
action1 = Action("say_hello_1", action=say_hello)
action2 = Action("say_hello_2", action=say_hello)
action3 = Action("say_hello_3", action=say_hello)
# Combine into an ActionGroup
group = ActionGroup(name="greet_group", actions=[action1, action2, action3])
# Create the Command with auto_args=True
cmd = Command(
key="G",
description="Greet someone with multiple variations.",
action=group,
arg_metadata={
"name": {
"help": "The name of the person to greet.",
},
"excited": {
"help": "Whether to greet excitedly.",
},
},
)
flx = Falyx("Test Group")
flx.add_command_from_command(cmd)
asyncio.run(flx.run())

View File

@ -0,0 +1,54 @@
import asyncio
from falyx import Action, ChainedAction, Falyx
from falyx.utils import setup_logging
setup_logging()
async def deploy(service: str, region: str = "us-east-1", verbose: bool = False) -> str:
if verbose:
print(f"Deploying {service} to {region}...")
await asyncio.sleep(2)
if verbose:
print(f"{service} deployed successfully!")
return f"{service} deployed to {region}"
flx = Falyx("Deployment CLI")
flx.add_command(
key="D",
aliases=["deploy"],
description="Deploy a service to a specified region.",
action=Action(
name="deploy_service",
action=deploy,
),
arg_metadata={
"service": "Service name",
"region": {"help": "Deployment region", "choices": ["us-east-1", "us-west-2"]},
"verbose": {"help": "Enable verbose mode"},
},
)
deploy_chain = ChainedAction(
name="DeployChain",
actions=[
Action(name="deploy_service", action=deploy),
Action(
name="notify",
action=lambda last_result: print(f"Notification: {last_result}"),
),
],
auto_inject=True,
)
flx.add_command(
key="N",
aliases=["notify"],
description="Deploy a service and notify.",
action=deploy_chain,
)
asyncio.run(flx.run())

View File

@ -4,7 +4,6 @@ from rich.console import Console
from falyx import ActionGroup, Falyx
from falyx.action import HTTPAction
from falyx.hook_manager import HookType
from falyx.hooks import ResultReporter
console = Console()
@ -49,7 +48,7 @@ action_group = ActionGroup(
reporter = ResultReporter()
action_group.hooks.register(
HookType.ON_SUCCESS,
"on_success",
reporter.report,
)

View File

@ -3,7 +3,6 @@ import asyncio
from falyx import Action, ActionGroup, ChainedAction
from falyx import ExecutionRegistry as er
from falyx import ProcessAction
from falyx.hook_manager import HookType
from falyx.retry import RetryHandler, RetryPolicy
@ -47,7 +46,7 @@ def build_pipeline():
checkout = Action("Checkout", checkout_code)
analysis = ProcessAction("Static Analysis", run_static_analysis)
tests = Action("Run Tests", flaky_tests)
tests.hooks.register(HookType.ON_ERROR, retry_handler.retry_on_error)
tests.hooks.register("on_error", retry_handler.retry_on_error)
# Parallel deploys
deploy_group = ActionGroup(

View File

@ -1,22 +1,26 @@
import asyncio
from falyx.selection import (
SelectionOption,
prompt_for_selection,
render_selection_dict_table,
)
from falyx.action import SelectionAction
from falyx.selection import SelectionOption
menu = {
"A": SelectionOption("Run diagnostics", lambda: print("Running diagnostics...")),
"B": SelectionOption("Deploy to staging", lambda: print("Deploying...")),
selections = {
"1": SelectionOption(
description="Production", value="3bc2616e-3696-11f0-a139-089204eb86ac"
),
"2": SelectionOption(
description="Staging", value="42f2cd84-3696-11f0-a139-089204eb86ac"
),
}
table = render_selection_dict_table(
title="Main Menu",
selections=menu,
select = SelectionAction(
name="Select Deployment",
selections=selections,
title="Select a Deployment",
columns=2,
prompt_message="> ",
return_type="value",
show_table=True,
)
key = asyncio.run(prompt_for_selection(menu.keys(), table))
print(f"You selected: {key}")
menu[key.upper()].value()
print(asyncio.run(select()))

View File

@ -3,7 +3,6 @@ import asyncio
from falyx import Action, ChainedAction, Falyx
from falyx.action import ShellAction
from falyx.hook_manager import HookType
from falyx.hooks import ResultReporter
from falyx.utils import setup_logging
@ -42,12 +41,12 @@ reporter = ResultReporter()
a1 = Action("a1", a1, inject_last_result=True)
a1.hooks.register(
HookType.ON_SUCCESS,
"on_success",
reporter.report,
)
a2 = Action("a2", a2, inject_last_result=True)
a2.hooks.register(
HookType.ON_SUCCESS,
"on_success",
reporter.report,
)

View File

@ -12,7 +12,6 @@ from .command import Command
from .context import ExecutionContext, SharedContext
from .execution_registry import ExecutionRegistry
from .falyx import Falyx
from .hook_manager import HookType
logger = logging.getLogger("falyx")

0
falyx/action/.pytyped Normal file
View File

View File

@ -47,6 +47,7 @@ from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import Hook, HookManager, HookType
from falyx.logger import logger
from falyx.options_manager import OptionsManager
from falyx.parsers.utils import same_argument_definitions
from falyx.retry import RetryHandler, RetryPolicy
from falyx.themes import OneColors
from falyx.utils import ensure_async
@ -61,8 +62,7 @@ class BaseAction(ABC):
inject_last_result (bool): Whether to inject the previous action's result
into kwargs.
inject_into (str): The name of the kwarg key to inject the result as
(default: 'last_result').
_requires_injection (bool): Whether the action requires input injection.
(default: 'last_result').
"""
def __init__(
@ -82,7 +82,6 @@ class BaseAction(ABC):
self.inject_last_result: bool = inject_last_result
self.inject_into: str = inject_into
self._never_prompt: bool = never_prompt
self._requires_injection: bool = False
self._skip_in_chain: bool = False
self.console = Console(color_system="auto")
self.options_manager: OptionsManager | None = None
@ -101,6 +100,14 @@ class BaseAction(ABC):
async def preview(self, parent: Tree | None = None):
raise NotImplementedError("preview must be implemented by subclasses")
@abstractmethod
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
"""
Returns the callable to be used for argument inference.
By default, it returns None.
"""
raise NotImplementedError("get_infer_target must be implemented by subclasses")
def set_options_manager(self, options_manager: OptionsManager) -> None:
self.options_manager = options_manager
@ -154,10 +161,6 @@ class BaseAction(ABC):
async def _write_stdout(self, data: str) -> None:
"""Override in subclasses that produce terminal output."""
def requires_io_injection(self) -> bool:
"""Checks to see if the action requires input injection."""
return self._requires_injection
def __repr__(self) -> str:
return str(self)
@ -246,6 +249,13 @@ class Action(BaseAction):
if policy.enabled:
self.enable_retry()
def get_infer_target(self) -> tuple[Callable[..., Any], None]:
"""
Returns the callable to be used for argument inference.
By default, it returns the action itself.
"""
return self.action, None
async def _run(self, *args, **kwargs) -> Any:
combined_args = args + self.args
combined_kwargs = self._maybe_inject_last_result({**self.kwargs, **kwargs})
@ -477,6 +487,14 @@ class ChainedAction(BaseAction, ActionListMixin):
if hasattr(action, "register_teardown") and callable(action.register_teardown):
action.register_teardown(self.hooks)
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
if self.actions:
return self.actions[0].get_infer_target()
return None, None
def _clear_args(self):
return (), {}
async def _run(self, *args, **kwargs) -> list[Any]:
if not self.actions:
raise EmptyChainError(f"[{self.name}] No actions to execute.")
@ -505,12 +523,8 @@ class ChainedAction(BaseAction, ActionListMixin):
continue
shared_context.current_index = index
prepared = action.prepare(shared_context, self.options_manager)
last_result = shared_context.last_result()
try:
if self.requires_io_injection() and last_result is not None:
result = await prepared(**{prepared.inject_into: last_result})
else:
result = await prepared(*args, **updated_kwargs)
result = await prepared(*args, **updated_kwargs)
except Exception as error:
if index + 1 < len(self.actions) and isinstance(
self.actions[index + 1], FallbackAction
@ -529,6 +543,7 @@ class ChainedAction(BaseAction, ActionListMixin):
fallback._skip_in_chain = True
else:
raise
args, updated_kwargs = self._clear_args()
shared_context.add_result(result)
context.extra["results"].append(result)
context.extra["rollback_stack"].append(prepared)
@ -669,6 +684,16 @@ class ActionGroup(BaseAction, ActionListMixin):
if hasattr(action, "register_teardown") and callable(action.register_teardown):
action.register_teardown(self.hooks)
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
arg_defs = same_argument_definitions(self.actions)
if arg_defs:
return self.actions[0].get_infer_target()
logger.debug(
"[%s] auto_args disabled: mismatched ActionGroup arguments",
self.name,
)
return None, None
async def _run(self, *args, **kwargs) -> list[tuple[str, Any]]:
shared_context = SharedContext(name=self.name, action=self, is_parallel=True)
if self.shared_context:
@ -787,8 +812,11 @@ class ProcessAction(BaseAction):
self.executor = executor or ProcessPoolExecutor()
self.is_retryable = True
async def _run(self, *args, **kwargs):
if self.inject_last_result:
def get_infer_target(self) -> tuple[Callable[..., Any] | None, None]:
return self.action, None
async def _run(self, *args, **kwargs) -> Any:
if self.inject_last_result and self.shared_context:
last_result = self.shared_context.last_result()
if not self._validate_pickleable(last_result):
raise ValueError(

View File

@ -1,6 +1,6 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""action_factory.py"""
from typing import Any
from typing import Any, Callable
from rich.tree import Tree
@ -35,6 +35,8 @@ class ActionFactoryAction(BaseAction):
*,
inject_last_result: bool = False,
inject_into: str = "last_result",
args: tuple[Any, ...] = (),
kwargs: dict[str, Any] | None = None,
preview_args: tuple[Any, ...] = (),
preview_kwargs: dict[str, Any] | None = None,
):
@ -44,6 +46,8 @@ class ActionFactoryAction(BaseAction):
inject_into=inject_into,
)
self.factory = factory
self.args = args
self.kwargs = kwargs or {}
self.preview_args = preview_args
self.preview_kwargs = preview_kwargs or {}
@ -55,7 +59,12 @@ class ActionFactoryAction(BaseAction):
def factory(self, value: ActionFactoryProtocol):
self._factory = ensure_async(value)
def get_infer_target(self) -> tuple[Callable[..., Any], None]:
return self.factory, None
async def _run(self, *args, **kwargs) -> Any:
args = (*self.args, *args)
kwargs = {**self.kwargs, **kwargs}
updated_kwargs = self._maybe_inject_last_result(kwargs)
context = ExecutionContext(
name=f"{self.name} (factory)",
@ -85,7 +94,7 @@ class ActionFactoryAction(BaseAction):
)
if self.options_manager:
generated_action.set_options_manager(self.options_manager)
context.result = await generated_action(*args, **kwargs)
context.result = await generated_action()
await self.hooks.trigger(HookType.ON_SUCCESS, context)
return context.result
except Exception as error:

View File

@ -19,7 +19,7 @@ import asyncio
import shlex
import subprocess
import sys
from typing import Any
from typing import Any, Callable
from rich.tree import Tree
@ -73,7 +73,6 @@ class BaseIOAction(BaseAction):
inject_last_result=inject_last_result,
)
self.mode = mode
self._requires_injection = True
def from_input(self, raw: str | bytes) -> Any:
raise NotImplementedError
@ -81,15 +80,15 @@ class BaseIOAction(BaseAction):
def to_output(self, result: Any) -> str | bytes:
raise NotImplementedError
async def _resolve_input(self, kwargs: dict[str, Any]) -> str | bytes:
last_result = kwargs.pop(self.inject_into, None)
async def _resolve_input(
self, args: tuple[Any], kwargs: dict[str, Any]
) -> str | bytes:
data = await self._read_stdin()
if data:
return self.from_input(data)
if last_result is not None:
return last_result
if len(args) == 1:
return self.from_input(args[0])
if self.inject_last_result and self.shared_context:
return self.shared_context.last_result()
@ -99,6 +98,9 @@ class BaseIOAction(BaseAction):
)
raise FalyxError("No input provided and no last result to inject.")
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
return None, None
async def __call__(self, *args, **kwargs):
context = ExecutionContext(
name=self.name,
@ -117,8 +119,8 @@ class BaseIOAction(BaseAction):
pass
result = getattr(self, "_last_result", None)
else:
parsed_input = await self._resolve_input(kwargs)
result = await self._run(parsed_input, *args, **kwargs)
parsed_input = await self._resolve_input(args, kwargs)
result = await self._run(parsed_input)
output = self.to_output(result)
await self._write_stdout(output)
context.result = result
@ -195,7 +197,6 @@ class ShellAction(BaseIOAction):
- Captures stdout and stderr from shell execution
- Raises on non-zero exit codes with stderr as the error
- Result is returned as trimmed stdout string
- Compatible with ChainedAction and Command.requires_input detection
Args:
name (str): Name of the action.
@ -220,11 +221,19 @@ class ShellAction(BaseIOAction):
)
return raw.strip() if isinstance(raw, str) else raw.decode("utf-8").strip()
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
if sys.stdin.isatty():
return self._run, {"parsed_input": {"help": self.command_template}}
return None, None
async def _run(self, parsed_input: str) -> str:
# Replace placeholder in template, or use raw input as full command
command = self.command_template.format(parsed_input)
if self.safe_mode:
args = shlex.split(command)
try:
args = shlex.split(command)
except ValueError as error:
raise FalyxError(f"Invalid command template: {error}")
result = subprocess.run(args, capture_output=True, text=True, check=True)
else:
result = subprocess.run(

View File

@ -73,6 +73,9 @@ class MenuAction(BaseAction):
table.add_row(*row)
return table
def get_infer_target(self) -> tuple[None, None]:
return None, None
async def _run(self, *args, **kwargs) -> Any:
kwargs = self._maybe_inject_last_result(kwargs)
context = ExecutionContext(

View File

@ -25,6 +25,7 @@ from falyx.selection import (
prompt_for_selection,
render_selection_dict_table,
)
from falyx.signals import CancelSignal
from falyx.themes import OneColors
@ -121,6 +122,16 @@ class SelectFileAction(BaseAction):
logger.warning("[ERROR] Failed to parse %s: %s", file.name, error)
return options
def _find_cancel_key(self, options) -> str:
"""Return first numeric value not already used in the selection dict."""
for index in range(len(options)):
if str(index) not in options:
return str(index)
return str(len(options))
def get_infer_target(self) -> tuple[None, None]:
return None, None
async def _run(self, *args, **kwargs) -> Any:
context = ExecutionContext(name=self.name, args=args, kwargs=kwargs, action=self)
context.start_timer()
@ -128,28 +139,38 @@ class SelectFileAction(BaseAction):
await self.hooks.trigger(HookType.BEFORE, context)
files = [
f
for f in self.directory.iterdir()
if f.is_file()
and (self.suffix_filter is None or f.suffix == self.suffix_filter)
file
for file in self.directory.iterdir()
if file.is_file()
and (self.suffix_filter is None or file.suffix == self.suffix_filter)
]
if not files:
raise FileNotFoundError("No files found in directory.")
options = self.get_options(files)
cancel_key = self._find_cancel_key(options)
cancel_option = {
cancel_key: SelectionOption(
description="Cancel", value=CancelSignal(), style=OneColors.DARK_RED
)
}
table = render_selection_dict_table(
title=self.title, selections=options, columns=self.columns
title=self.title, selections=options | cancel_option, columns=self.columns
)
key = await prompt_for_selection(
options.keys(),
(options | cancel_option).keys(),
table,
console=self.console,
prompt_session=self.prompt_session,
prompt_message=self.prompt_message,
)
if key == cancel_key:
raise CancelSignal("User canceled the selection.")
result = options[key].value
context.result = result
await self.hooks.trigger(HookType.ON_SUCCESS, context)
@ -176,11 +197,11 @@ class SelectFileAction(BaseAction):
try:
files = list(self.directory.iterdir())
if self.suffix_filter:
files = [f for f in files if f.suffix == self.suffix_filter]
files = [file for file in files if file.suffix == self.suffix_filter]
sample = files[:10]
file_list = tree.add("[dim]Files:[/]")
for f in sample:
file_list.add(f"[dim]{f.name}[/]")
for file in sample:
file_list.add(f"[dim]{file.name}[/]")
if len(files) > 10:
file_list.add(f"[dim]... ({len(files) - 10} more)[/]")
except Exception as error:

View File

@ -7,19 +7,21 @@ from rich.console import Console
from rich.tree import Tree
from falyx.action.action import BaseAction
from falyx.action.types import SelectionReturnType
from falyx.context import ExecutionContext
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookType
from falyx.logger import logger
from falyx.selection import (
SelectionOption,
SelectionOptionMap,
prompt_for_index,
prompt_for_selection,
render_selection_dict_table,
render_selection_indexed_table,
)
from falyx.signals import CancelSignal
from falyx.themes import OneColors
from falyx.utils import CaseInsensitiveDict
class SelectionAction(BaseAction):
@ -34,7 +36,13 @@ class SelectionAction(BaseAction):
def __init__(
self,
name: str,
selections: list[str] | set[str] | tuple[str, ...] | dict[str, SelectionOption],
selections: (
list[str]
| set[str]
| tuple[str, ...]
| dict[str, SelectionOption]
| dict[str, Any]
),
*,
title: str = "Select an option",
columns: int = 5,
@ -42,7 +50,7 @@ class SelectionAction(BaseAction):
default_selection: str = "",
inject_last_result: bool = False,
inject_into: str = "last_result",
return_key: bool = False,
return_type: SelectionReturnType | str = "value",
console: Console | None = None,
prompt_session: PromptSession | None = None,
never_prompt: bool = False,
@ -55,8 +63,8 @@ class SelectionAction(BaseAction):
never_prompt=never_prompt,
)
# Setter normalizes to correct type, mypy can't infer that
self.selections: list[str] | CaseInsensitiveDict = selections # type: ignore[assignment]
self.return_key = return_key
self.selections: list[str] | SelectionOptionMap = selections # type: ignore[assignment]
self.return_type: SelectionReturnType = self._coerce_return_type(return_type)
self.title = title
self.columns = columns
self.console = console or Console(color_system="auto")
@ -65,8 +73,15 @@ class SelectionAction(BaseAction):
self.prompt_message = prompt_message
self.show_table = show_table
def _coerce_return_type(
self, return_type: SelectionReturnType | str
) -> SelectionReturnType:
if isinstance(return_type, SelectionReturnType):
return return_type
return SelectionReturnType(return_type)
@property
def selections(self) -> list[str] | CaseInsensitiveDict:
def selections(self) -> list[str] | SelectionOptionMap:
return self._selections
@selections.setter
@ -74,17 +89,41 @@ class SelectionAction(BaseAction):
self, value: list[str] | set[str] | tuple[str, ...] | dict[str, SelectionOption]
):
if isinstance(value, (list, tuple, set)):
self._selections: list[str] | CaseInsensitiveDict = list(value)
self._selections: list[str] | SelectionOptionMap = list(value)
elif isinstance(value, dict):
cid = CaseInsensitiveDict()
cid.update(value)
self._selections = cid
som = SelectionOptionMap()
if all(isinstance(key, str) for key in value) and all(
not isinstance(value[key], SelectionOption) for key in value
):
som.update(
{
str(index): SelectionOption(key, option)
for index, (key, option) in enumerate(value.items())
}
)
elif all(isinstance(key, str) for key in value) and all(
isinstance(value[key], SelectionOption) for key in value
):
som.update(value)
else:
raise ValueError("Invalid dictionary format. Keys must be strings")
self._selections = som
else:
raise TypeError(
"'selections' must be a list[str] or dict[str, SelectionOption], "
f"got {type(value).__name__}"
)
def _find_cancel_key(self) -> str:
"""Return first numeric value not already used in the selection dict."""
for index in range(len(self.selections)):
if str(index) not in self.selections:
return str(index)
return str(len(self.selections))
def get_infer_target(self) -> tuple[None, None]:
return None, None
async def _run(self, *args, **kwargs) -> Any:
kwargs = self._maybe_inject_last_result(kwargs)
context = ExecutionContext(
@ -125,16 +164,17 @@ class SelectionAction(BaseAction):
context.start_timer()
try:
cancel_key = self._find_cancel_key()
await self.hooks.trigger(HookType.BEFORE, context)
if isinstance(self.selections, list):
table = render_selection_indexed_table(
title=self.title,
selections=self.selections,
selections=self.selections + ["Cancel"],
columns=self.columns,
)
if not self.never_prompt:
index = await prompt_for_index(
len(self.selections) - 1,
len(self.selections),
table,
default_selection=effective_default,
console=self.console,
@ -144,14 +184,23 @@ class SelectionAction(BaseAction):
)
else:
index = effective_default
result = self.selections[int(index)]
if index == cancel_key:
raise CancelSignal("User cancelled the selection.")
result: Any = self.selections[int(index)]
elif isinstance(self.selections, dict):
cancel_option = {
cancel_key: SelectionOption(
description="Cancel", value=CancelSignal, style=OneColors.DARK_RED
)
}
table = render_selection_dict_table(
title=self.title, selections=self.selections, columns=self.columns
title=self.title,
selections=self.selections | cancel_option,
columns=self.columns,
)
if not self.never_prompt:
key = await prompt_for_selection(
self.selections.keys(),
(self.selections | cancel_option).keys(),
table,
default_selection=effective_default,
console=self.console,
@ -161,10 +210,25 @@ class SelectionAction(BaseAction):
)
else:
key = effective_default
result = key if self.return_key else self.selections[key].value
if key == cancel_key:
raise CancelSignal("User cancelled the selection.")
if self.return_type == SelectionReturnType.KEY:
result = key
elif self.return_type == SelectionReturnType.VALUE:
result = self.selections[key].value
elif self.return_type == SelectionReturnType.ITEMS:
result = {key: self.selections[key]}
elif self.return_type == SelectionReturnType.DESCRIPTION:
result = self.selections[key].description
elif self.return_type == SelectionReturnType.DESCRIPTION_VALUE:
result = {
self.selections[key].description: self.selections[key].value
}
else:
raise ValueError(f"Unsupported return type: {self.return_type}")
else:
raise TypeError(
"'selections' must be a list[str] or dict[str, tuple[str, Any]], "
"'selections' must be a list[str] or dict[str, Any], "
f"got {type(self.selections).__name__}"
)
context.result = result
@ -203,7 +267,7 @@ class SelectionAction(BaseAction):
return
tree.add(f"[dim]Default:[/] '{self.default_selection or self.last_result}'")
tree.add(f"[dim]Return:[/] {'Key' if self.return_key else 'Value'}")
tree.add(f"[dim]Return:[/] {self.return_type.name.capitalize()}")
tree.add(f"[dim]Prompt:[/] {'Disabled' if self.never_prompt else 'Enabled'}")
if not parent:
@ -218,6 +282,6 @@ class SelectionAction(BaseAction):
return (
f"SelectionAction(name={self.name!r}, type={selection_type}, "
f"default_selection={self.default_selection!r}, "
f"return_key={self.return_key}, "
f"return_type={self.return_type!r}, "
f"prompt={'off' if self.never_prompt else 'on'})"
)

View File

@ -35,3 +35,18 @@ class FileReturnType(Enum):
return member
valid = ", ".join(member.value for member in cls)
raise ValueError(f"Invalid FileReturnType: '{value}'. Must be one of: {valid}")
class SelectionReturnType(Enum):
"""Enum for dictionary return types."""
KEY = "key"
VALUE = "value"
DESCRIPTION = "description"
DESCRIPTION_VALUE = "description_value"
ITEMS = "items"
@classmethod
def _missing_(cls, value: object) -> SelectionReturnType:
valid = ", ".join(member.value for member in cls)
raise ValueError(f"Invalid DictReturnType: '{value}'. Must be one of: {valid}")

View File

@ -43,6 +43,9 @@ class UserInputAction(BaseAction):
self.console = console or Console(color_system="auto")
self.prompt_session = prompt_session or PromptSession()
def get_infer_target(self) -> tuple[None, None]:
return None, None
async def _run(self, *args, **kwargs) -> str:
context = ExecutionContext(
name=self.name,

View File

@ -19,7 +19,6 @@ in building robust interactive menus.
from __future__ import annotations
import shlex
from functools import cached_property
from typing import Any, Callable
from prompt_toolkit.formatted_text import FormattedText
@ -27,15 +26,15 @@ from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator
from rich.console import Console
from rich.tree import Tree
from falyx.action.action import Action, ActionGroup, BaseAction, ChainedAction
from falyx.action.io_action import BaseIOAction
from falyx.argparse import CommandArgumentParser
from falyx.action.action import Action, BaseAction
from falyx.context import ExecutionContext
from falyx.debug import register_debug_hooks
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.logger import logger
from falyx.options_manager import OptionsManager
from falyx.parsers.argparse import CommandArgumentParser
from falyx.parsers.signature import infer_args_from_func
from falyx.prompt_utils import confirm_async, should_prompt_user
from falyx.protocols import ArgParserProtocol
from falyx.retry import RetryPolicy
@ -89,7 +88,11 @@ class Command(BaseModel):
retry_policy (RetryPolicy): Retry behavior configuration.
tags (list[str]): Organizational tags for the command.
logging_hooks (bool): Whether to attach logging hooks automatically.
requires_input (bool | None): Indicates if the action needs input.
options_manager (OptionsManager): Manages global command-line options.
arg_parser (CommandArgumentParser): Parses command arguments.
custom_parser (ArgParserProtocol | None): Custom argument parser.
custom_help (Callable[[], str | None] | None): Custom help message generator.
auto_args (bool): Automatically infer arguments from the action.
Methods:
__call__(): Executes the command, respecting hooks and retries.
@ -101,12 +104,13 @@ class Command(BaseModel):
key: str
description: str
action: BaseAction | Callable[[], Any]
action: BaseAction | Callable[..., Any]
args: tuple = ()
kwargs: dict[str, Any] = Field(default_factory=dict)
hidden: bool = False
aliases: list[str] = Field(default_factory=list)
help_text: str = ""
help_epilogue: str = ""
style: str = OneColors.WHITE
confirm: bool = False
confirm_message: str = "Are you sure?"
@ -122,25 +126,46 @@ class Command(BaseModel):
retry_policy: RetryPolicy = Field(default_factory=RetryPolicy)
tags: list[str] = Field(default_factory=list)
logging_hooks: bool = False
requires_input: bool | None = None
options_manager: OptionsManager = Field(default_factory=OptionsManager)
arg_parser: CommandArgumentParser = Field(default_factory=CommandArgumentParser)
arguments: list[dict[str, Any]] = Field(default_factory=list)
argument_config: Callable[[CommandArgumentParser], None] | None = None
custom_parser: ArgParserProtocol | None = None
custom_help: Callable[[], str | None] | None = None
auto_args: bool = True
arg_metadata: dict[str, str | dict[str, Any]] = Field(default_factory=dict)
_context: ExecutionContext | None = PrivateAttr(default=None)
model_config = ConfigDict(arbitrary_types_allowed=True)
def parse_args(self, raw_args: list[str] | str) -> tuple[tuple, dict]:
if self.custom_parser:
def parse_args(
self, raw_args: list[str] | str, from_validate: bool = False
) -> tuple[tuple, dict]:
if callable(self.custom_parser):
if isinstance(raw_args, str):
raw_args = shlex.split(raw_args)
try:
raw_args = shlex.split(raw_args)
except ValueError:
logger.warning(
"[Command:%s] Failed to split arguments: %s",
self.key,
raw_args,
)
return ((), {})
return self.custom_parser(raw_args)
if isinstance(raw_args, str):
raw_args = shlex.split(raw_args)
return self.arg_parser.parse_args_split(raw_args)
try:
raw_args = shlex.split(raw_args)
except ValueError:
logger.warning(
"[Command:%s] Failed to split arguments: %s",
self.key,
raw_args,
)
return ((), {})
return self.arg_parser.parse_args_split(raw_args, from_validate=from_validate)
@field_validator("action", mode="before")
@classmethod
@ -151,11 +176,24 @@ class Command(BaseModel):
return ensure_async(action)
raise TypeError("Action must be a callable or an instance of BaseAction")
def get_argument_definitions(self) -> list[dict[str, Any]]:
if self.arguments:
return self.arguments
elif callable(self.argument_config):
self.argument_config(self.arg_parser)
elif self.auto_args:
if isinstance(self.action, BaseAction):
infer_target, maybe_metadata = self.action.get_infer_target()
# merge metadata with the action's metadata if not already in self.arg_metadata
if maybe_metadata:
self.arg_metadata = {**maybe_metadata, **self.arg_metadata}
return infer_args_from_func(infer_target, self.arg_metadata)
elif callable(self.action):
return infer_args_from_func(self.action, self.arg_metadata)
return []
def model_post_init(self, _: Any) -> None:
"""Post-initialization to set up the action and hooks."""
if isinstance(self.arg_parser, CommandArgumentParser):
self.arg_parser.command_description = self.description
if self.retry and isinstance(self.action, Action):
self.action.enable_retry()
elif self.retry_policy and isinstance(self.action, Action):
@ -177,26 +215,8 @@ class Command(BaseModel):
if self.logging_hooks and isinstance(self.action, BaseAction):
register_debug_hooks(self.action.hooks)
if self.requires_input is None and self.detect_requires_input:
self.requires_input = True
self.hidden = True
elif self.requires_input is None:
self.requires_input = False
@cached_property
def detect_requires_input(self) -> bool:
"""Detect if the action requires input based on its type."""
if isinstance(self.action, BaseIOAction):
return True
elif isinstance(self.action, ChainedAction):
return (
isinstance(self.action.actions[0], BaseIOAction)
if self.action.actions
else False
)
elif isinstance(self.action, ActionGroup):
return any(isinstance(action, BaseIOAction) for action in self.action.actions)
return False
for arg_def in self.get_argument_definitions():
self.arg_parser.add_argument(*arg_def.pop("flags"), **arg_def)
def _inject_options_manager(self) -> None:
"""Inject the options manager into the action if applicable."""
@ -290,7 +310,7 @@ class Command(BaseModel):
def show_help(self) -> bool:
"""Display the help message for the command."""
if self.custom_help:
if callable(self.custom_help):
output = self.custom_help()
if output:
console.print(output)

View File

@ -98,7 +98,6 @@ class RawCommand(BaseModel):
retry: bool = False
retry_all: bool = False
retry_policy: RetryPolicy = Field(default_factory=RetryPolicy)
requires_input: bool | None = None
hidden: bool = False
help_text: str = ""

View File

@ -58,11 +58,12 @@ from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import Hook, HookManager, HookType
from falyx.logger import logger
from falyx.options_manager import OptionsManager
from falyx.parsers import get_arg_parsers
from falyx.parsers import CommandArgumentParser, get_arg_parsers
from falyx.protocols import ArgParserProtocol
from falyx.retry import RetryPolicy
from falyx.signals import BackSignal, CancelSignal, HelpSignal, QuitSignal
from falyx.themes import OneColors, get_nord_theme
from falyx.utils import CaseInsensitiveDict, _noop, chunks, get_program_invocation
from falyx.utils import CaseInsensitiveDict, _noop, chunks
from falyx.version import __version__
@ -89,7 +90,7 @@ class CommandValidator(Validator):
if not choice:
raise ValidationError(
message=self.error_message,
cursor_position=document.get_end_of_document_position(),
cursor_position=len(text),
)
@ -110,6 +111,8 @@ class Falyx:
- Submenu nesting and action chaining
- History tracking, help generation, and run key execution modes
- Seamless CLI argument parsing and integration via argparse
- Declarative option management with OptionsManager
- Command level argument parsing and validation
- Extensible with user-defined hooks, bottom bars, and custom layouts
Args:
@ -125,7 +128,7 @@ class Falyx:
never_prompt (bool): Seed default for `OptionsManager["never_prompt"]`
force_confirm (bool): Seed default for `OptionsManager["force_confirm"]`
cli_args (Namespace | None): Parsed CLI arguments, usually from argparse.
options (OptionsManager | None): Declarative option mappings.
options (OptionsManager | None): Declarative option mappings for global state.
custom_table (Callable[[Falyx], Table] | Table | None): Custom menu table
generator.
@ -157,8 +160,9 @@ class Falyx:
force_confirm: bool = False,
cli_args: Namespace | None = None,
options: OptionsManager | None = None,
render_menu: Callable[["Falyx"], None] | None = None,
custom_table: Callable[["Falyx"], Table] | Table | None = None,
render_menu: Callable[[Falyx], None] | None = None,
custom_table: Callable[[Falyx], Table] | Table | None = None,
hide_menu_table: bool = False,
) -> None:
"""Initializes the Falyx object."""
self.title: str | Markdown = title
@ -182,8 +186,9 @@ class Falyx:
self._never_prompt: bool = never_prompt
self._force_confirm: bool = force_confirm
self.cli_args: Namespace | None = cli_args
self.render_menu: Callable[["Falyx"], None] | None = render_menu
self.custom_table: Callable[["Falyx"], Table] | Table | None = custom_table
self.render_menu: Callable[[Falyx], None] | None = render_menu
self.custom_table: Callable[[Falyx], Table] | Table | None = custom_table
self.hide_menu_table: bool = hide_menu_table
self.validate_options(cli_args, options)
self._prompt_session: PromptSession | None = None
self.mode = FalyxMode.MENU
@ -286,8 +291,6 @@ class Falyx:
for command in self.commands.values():
help_text = command.help_text or command.description
if command.requires_input:
help_text += " [dim](requires input)[/dim]"
table.add_row(
f"[{command.style}]{command.key}[/]",
", ".join(command.aliases) if command.aliases else "",
@ -524,7 +527,7 @@ class Falyx:
key: str = "X",
description: str = "Exit",
aliases: list[str] | None = None,
action: Callable[[], Any] | None = None,
action: Callable[..., Any] | None = None,
style: str = OneColors.DARK_RED,
confirm: bool = False,
confirm_message: str = "Are you sure?",
@ -578,13 +581,14 @@ class Falyx:
self,
key: str,
description: str,
action: BaseAction | Callable[[], Any],
action: BaseAction | Callable[..., Any],
*,
args: tuple = (),
kwargs: dict[str, Any] | None = None,
hidden: bool = False,
aliases: list[str] | None = None,
help_text: str = "",
help_epilogue: str = "",
style: str = OneColors.WHITE,
confirm: bool = False,
confirm_message: str = "Are you sure?",
@ -605,10 +609,33 @@ class Falyx:
retry: bool = False,
retry_all: bool = False,
retry_policy: RetryPolicy | None = None,
requires_input: bool | None = None,
arg_parser: CommandArgumentParser | None = None,
arguments: list[dict[str, Any]] | None = None,
argument_config: Callable[[CommandArgumentParser], None] | None = None,
custom_parser: ArgParserProtocol | None = None,
custom_help: Callable[[], str | None] | None = None,
auto_args: bool = True,
arg_metadata: dict[str, str | dict[str, Any]] | None = None,
) -> Command:
"""Adds an command to the menu, preventing duplicates."""
self._validate_command_key(key)
if arg_parser:
if not isinstance(arg_parser, CommandArgumentParser):
raise NotAFalyxError(
"arg_parser must be an instance of CommandArgumentParser."
)
arg_parser = arg_parser
else:
arg_parser = CommandArgumentParser(
command_key=key,
command_description=description,
command_style=style,
help_text=help_text,
help_epilogue=help_epilogue,
aliases=aliases,
)
command = Command(
key=key,
description=description,
@ -618,6 +645,7 @@ class Falyx:
hidden=hidden,
aliases=aliases if aliases else [],
help_text=help_text,
help_epilogue=help_epilogue,
style=style,
confirm=confirm,
confirm_message=confirm_message,
@ -632,8 +660,14 @@ class Falyx:
retry=retry,
retry_all=retry_all,
retry_policy=retry_policy or RetryPolicy(),
requires_input=requires_input,
options_manager=self.options,
arg_parser=arg_parser,
arguments=arguments or [],
argument_config=argument_config,
custom_parser=custom_parser,
custom_help=custom_help,
auto_args=auto_args,
arg_metadata=arg_metadata or {},
)
if hooks:
@ -715,7 +749,10 @@ class Falyx:
"""
args = ()
kwargs: dict[str, Any] = {}
choice, *input_args = shlex.split(raw_choices)
try:
choice, *input_args = shlex.split(raw_choices)
except ValueError:
return False, None, args, kwargs
is_preview, choice = self.parse_preview_command(choice)
if is_preview and not choice and self.help_command:
is_preview = False
@ -730,24 +767,27 @@ class Falyx:
choice = choice.upper()
name_map = self._name_map
if choice in name_map:
if name_map.get(choice):
if not from_validate:
logger.info("Command '%s' selected.", choice)
if input_args and name_map[choice].arg_parser:
try:
args, kwargs = name_map[choice].parse_args(input_args)
except CommandArgumentError as error:
if not from_validate:
if not name_map[choice].show_help():
self.console.print(
f"[{OneColors.DARK_RED}]❌ Invalid arguments for '{choice}': {error}"
)
else:
name_map[choice].show_help()
raise ValidationError(
message=str(error), cursor_position=len(raw_choices)
if is_preview:
return True, name_map[choice], args, kwargs
try:
args, kwargs = name_map[choice].parse_args(input_args, from_validate)
except CommandArgumentError as error:
if not from_validate:
if not name_map[choice].show_help():
self.console.print(
f"[{OneColors.DARK_RED}]❌ Invalid arguments for '{choice}': {error}"
)
return is_preview, None, args, kwargs
else:
name_map[choice].show_help()
raise ValidationError(
message=str(error), cursor_position=len(raw_choices)
)
return is_preview, None, args, kwargs
except HelpSignal:
return True, None, args, kwargs
return is_preview, name_map[choice], args, kwargs
prefix_matches = [cmd for key, cmd in name_map.items() if key.startswith(choice)]
@ -804,15 +844,6 @@ class Falyx:
await selected_command.preview()
return True
if selected_command.requires_input:
program = get_program_invocation()
self.console.print(
f"[{OneColors.LIGHT_YELLOW}]⚠️ Command '{selected_command.key}' requires"
f" input and must be run via [{OneColors.MAGENTA}]'{program} run"
f"'[{OneColors.LIGHT_YELLOW}] with proper piping or arguments.[/]"
)
return True
self.last_run_command = selected_command
if selected_command == self.exit_command:
@ -823,7 +854,6 @@ class Falyx:
context.start_timer()
try:
await self.hooks.trigger(HookType.BEFORE, context)
print(args, kwargs)
result = await selected_command(*args, **kwargs)
context.result = result
await self.hooks.trigger(HookType.ON_SUCCESS, context)
@ -945,10 +975,11 @@ class Falyx:
self.print_message(self.welcome_message)
try:
while True:
if callable(self.render_menu):
self.render_menu(self)
else:
self.console.print(self.table, justify="center")
if not self.hide_menu_table:
if callable(self.render_menu):
self.render_menu(self)
else:
self.console.print(self.table, justify="center")
try:
task = asyncio.create_task(self.process_command())
should_continue = await task
@ -964,8 +995,6 @@ class Falyx:
logger.info("BackSignal received.")
except CancelSignal:
logger.info("CancelSignal received.")
except HelpSignal:
logger.info("HelpSignal received.")
finally:
logger.info("Exiting menu: %s", self.get_title())
if self.exit_message:
@ -995,7 +1024,7 @@ class Falyx:
sys.exit(0)
if self.cli_args.command == "version" or self.cli_args.version:
self.console.print(f"[{OneColors.GREEN_b}]Falyx CLI v{__version__}[/]")
self.console.print(f"[{OneColors.BLUE_b}]Falyx CLI v{__version__}[/]")
sys.exit(0)
if self.cli_args.command == "preview":

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import inspect
from enum import Enum
from typing import Awaitable, Callable, Dict, List, Optional, Union
from typing import Awaitable, Callable, Union
from falyx.context import ExecutionContext
from falyx.logger import logger
@ -24,7 +24,7 @@ class HookType(Enum):
ON_TEARDOWN = "on_teardown"
@classmethod
def choices(cls) -> List[HookType]:
def choices(cls) -> list[HookType]:
"""Return a list of all hook type choices."""
return list(cls)
@ -37,16 +37,17 @@ class HookManager:
"""HookManager"""
def __init__(self) -> None:
self._hooks: Dict[HookType, List[Hook]] = {
self._hooks: dict[HookType, list[Hook]] = {
hook_type: [] for hook_type in HookType
}
def register(self, hook_type: HookType, hook: Hook):
if hook_type not in HookType:
raise ValueError(f"Unsupported hook type: {hook_type}")
def register(self, hook_type: HookType | str, hook: Hook):
"""Raises ValueError if the hook type is not supported."""
if not isinstance(hook_type, HookType):
hook_type = HookType(hook_type)
self._hooks[hook_type].append(hook)
def clear(self, hook_type: Optional[HookType] = None):
def clear(self, hook_type: HookType | None = None):
if hook_type:
self._hooks[hook_type] = []
else:

View File

@ -33,7 +33,7 @@ class MenuOptionMap(CaseInsensitiveDict):
and special signal entries like Quit and Back.
"""
RESERVED_KEYS = {"Q", "B"}
RESERVED_KEYS = {"B", "X"}
def __init__(
self,
@ -49,14 +49,14 @@ class MenuOptionMap(CaseInsensitiveDict):
def _inject_reserved_defaults(self):
from falyx.action import SignalAction
self._add_reserved(
"Q",
MenuOption("Exit", SignalAction("Quit", QuitSignal()), OneColors.DARK_RED),
)
self._add_reserved(
"B",
MenuOption("Back", SignalAction("Back", BackSignal()), OneColors.DARK_YELLOW),
)
self._add_reserved(
"X",
MenuOption("Exit", SignalAction("Quit", QuitSignal()), OneColors.DARK_RED),
)
def _add_reserved(self, key: str, option: MenuOption) -> None:
"""Add a reserved key, bypassing validation."""
@ -78,8 +78,20 @@ class MenuOptionMap(CaseInsensitiveDict):
raise ValueError(f"Cannot delete reserved option '{key}'.")
super().__delitem__(key)
def update(self, other=None, **kwargs):
"""Update the selection options with another dictionary."""
if other:
for key, option in other.items():
if not isinstance(option, MenuOption):
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
self[key] = option
for key, option in kwargs.items():
if not isinstance(option, MenuOption):
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
self[key] = option
def items(self, include_reserved: bool = True):
for k, v in super().items():
if not include_reserved and k in self.RESERVED_KEYS:
for key, option in super().items():
if not include_reserved and key in self.RESERVED_KEYS:
continue
yield k, v
yield key, option

0
falyx/parsers/.pytyped Normal file
View File

17
falyx/parsers/__init__.py Normal file
View File

@ -0,0 +1,17 @@
"""
Falyx CLI Framework
Copyright (c) 2025 rtj.dev LLC.
Licensed under the MIT License. See LICENSE file for details.
"""
from .argparse import Argument, ArgumentAction, CommandArgumentParser
from .parsers import FalyxParsers, get_arg_parsers
__all__ = [
"Argument",
"ArgumentAction",
"CommandArgumentParser",
"get_arg_parsers",
"FalyxParsers",
]

View File

@ -1,11 +1,14 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
from __future__ import annotations
from copy import deepcopy
from dataclasses import dataclass
from enum import Enum
from typing import Any, Iterable
from rich.console import Console
from rich.table import Table
from rich.markup import escape
from rich.text import Text
from falyx.exceptions import CommandArgumentError
from falyx.signals import HelpSignal
@ -22,6 +25,15 @@ class ArgumentAction(Enum):
COUNT = "count"
HELP = "help"
@classmethod
def choices(cls) -> list[ArgumentAction]:
"""Return a list of all argument actions."""
return list(cls)
def __str__(self) -> str:
"""Return the string representation of the argument action."""
return self.value
@dataclass
class Argument:
@ -40,6 +52,74 @@ class Argument:
nargs: int | str = 1 # int, '?', '*', '+'
positional: bool = False # True if no leading - or -- in flags
def get_positional_text(self) -> str:
"""Get the positional text for the argument."""
text = ""
if self.positional:
if self.choices:
text = f"{{{','.join([str(choice) for choice in self.choices])}}}"
else:
text = self.dest
return text
def get_choice_text(self) -> str:
"""Get the choice text for the argument."""
choice_text = ""
if self.choices:
choice_text = f"{{{','.join([str(choice) for choice in self.choices])}}}"
elif (
self.action
in (
ArgumentAction.STORE,
ArgumentAction.APPEND,
ArgumentAction.EXTEND,
)
and not self.positional
):
choice_text = self.dest.upper()
elif self.action in (
ArgumentAction.STORE,
ArgumentAction.APPEND,
ArgumentAction.EXTEND,
) or isinstance(self.nargs, str):
choice_text = self.dest
if self.nargs == "?":
choice_text = f"[{choice_text}]"
elif self.nargs == "*":
choice_text = f"[{choice_text} ...]"
elif self.nargs == "+":
choice_text = f"{choice_text} [{choice_text} ...]"
return choice_text
def __eq__(self, other: object) -> bool:
if not isinstance(other, Argument):
return False
return (
self.flags == other.flags
and self.dest == other.dest
and self.action == other.action
and self.type == other.type
and self.choices == other.choices
and self.required == other.required
and self.nargs == other.nargs
and self.positional == other.positional
)
def __hash__(self) -> int:
return hash(
(
tuple(self.flags),
self.dest,
self.action,
self.type,
tuple(self.choices or []),
self.required,
self.nargs,
self.positional,
)
)
class CommandArgumentParser:
"""
@ -61,10 +141,25 @@ class CommandArgumentParser:
- Render Help using Rich library.
"""
def __init__(self) -> None:
def __init__(
self,
command_key: str = "",
command_description: str = "",
command_style: str = "bold",
help_text: str = "",
help_epilogue: str = "",
aliases: list[str] | None = None,
) -> None:
"""Initialize the CommandArgumentParser."""
self.command_description: str = ""
self.command_key: str = command_key
self.command_description: str = command_description
self.command_style: str = command_style
self.help_text: str = help_text
self.help_epilogue: str = help_epilogue
self.aliases: list[str] = aliases or []
self._arguments: list[Argument] = []
self._positional: list[Argument] = []
self._keyword: list[Argument] = []
self._flag_map: dict[str, Argument] = {}
self._dest_set: set[str] = set()
self._add_help()
@ -73,10 +168,10 @@ class CommandArgumentParser:
def _add_help(self):
"""Add help argument to the parser."""
self.add_argument(
"--help",
"-h",
"--help",
action=ArgumentAction.HELP,
help="Show this help message and exit.",
help="Show this help message.",
dest="help",
)
@ -304,10 +399,31 @@ class CommandArgumentParser:
)
self._flag_map[flag] = argument
self._arguments.append(argument)
if positional:
self._positional.append(argument)
else:
self._keyword.append(argument)
def get_argument(self, dest: str) -> Argument | None:
return next((a for a in self._arguments if a.dest == dest), None)
def to_definition_list(self) -> list[dict[str, Any]]:
defs = []
for arg in self._arguments:
defs.append(
{
"flags": arg.flags,
"dest": arg.dest,
"action": arg.action,
"type": arg.type,
"choices": arg.choices,
"required": arg.required,
"nargs": arg.nargs,
"positional": arg.positional,
}
)
return defs
def _consume_nargs(
self, args: list[str], start: int, spec: Argument
) -> tuple[list[str], int]:
@ -405,7 +521,9 @@ class CommandArgumentParser:
return i
def parse_args(self, args: list[str] | None = None) -> dict[str, Any]:
def parse_args(
self, args: list[str] | None = None, from_validate: bool = False
) -> dict[str, Any]:
"""Parse Falyx Command arguments."""
if args is None:
args = []
@ -423,7 +541,8 @@ class CommandArgumentParser:
action = spec.action
if action == ArgumentAction.HELP:
self.render_help()
if not from_validate:
self.render_help()
raise HelpSignal()
elif action == ArgumentAction.STORE_TRUE:
result[spec.dest] = True
@ -550,13 +669,15 @@ class CommandArgumentParser:
result.pop("help", None)
return result
def parse_args_split(self, args: list[str]) -> tuple[tuple[Any, ...], dict[str, Any]]:
def parse_args_split(
self, args: list[str], from_validate: bool = False
) -> tuple[tuple[Any, ...], dict[str, Any]]:
"""
Returns:
tuple[args, kwargs] - Positional arguments in defined order,
followed by keyword argument mapping.
"""
parsed = self.parse_args(args)
parsed = self.parse_args(args, from_validate)
args_list = []
kwargs_dict = {}
for arg in self._arguments:
@ -568,20 +689,74 @@ class CommandArgumentParser:
kwargs_dict[arg.dest] = parsed[arg.dest]
return tuple(args_list), kwargs_dict
def render_help(self):
table = Table(title=f"{self.command_description} Help")
table.add_column("Flags")
table.add_column("Help")
for arg in self._arguments:
if arg.dest == "help":
continue
flag_str = ", ".join(arg.flags) if not arg.positional else arg.dest
table.add_row(flag_str, arg.help or "")
table.add_section()
arg = self.get_argument("help")
flag_str = ", ".join(arg.flags) if not arg.positional else arg.dest
table.add_row(flag_str, arg.help or "")
self.console.print(table)
def render_help(self) -> None:
# Options
# Add all keyword arguments to the options list
options_list = []
for arg in self._keyword:
choice_text = arg.get_choice_text()
if choice_text:
options_list.extend([f"[{arg.flags[0]} {choice_text}]"])
else:
options_list.extend([f"[{arg.flags[0]}]"])
# Add positional arguments to the options list
for arg in self._positional:
choice_text = arg.get_choice_text()
if isinstance(arg.nargs, int):
choice_text = " ".join([choice_text] * arg.nargs)
options_list.append(escape(choice_text))
options_text = " ".join(options_list)
command_keys = " | ".join(
[f"[{self.command_style}]{self.command_key}[/{self.command_style}]"]
+ [
f"[{self.command_style}]{alias}[/{self.command_style}]"
for alias in self.aliases
]
)
usage = f"usage: {command_keys} {options_text}"
self.console.print(f"[bold]{usage}[/bold]\n")
# Description
if self.help_text:
self.console.print(self.help_text + "\n")
# Arguments
if self._arguments:
if self._positional:
self.console.print("[bold]positional:[/bold]")
for arg in self._positional:
flags = arg.get_positional_text()
arg_line = Text(f" {flags:<30} ")
help_text = arg.help or ""
arg_line.append(help_text)
self.console.print(arg_line)
self.console.print("[bold]options:[/bold]")
for arg in self._keyword:
flags = ", ".join(arg.flags)
flags_choice = f"{flags} {arg.get_choice_text()}"
arg_line = Text(f" {flags_choice:<30} ")
help_text = arg.help or ""
arg_line.append(help_text)
self.console.print(arg_line)
# Epilogue
if self.help_epilogue:
self.console.print("\n" + self.help_epilogue, style="dim")
def __eq__(self, other: object) -> bool:
if not isinstance(other, CommandArgumentParser):
return False
def sorted_args(parser):
return sorted(parser._arguments, key=lambda a: a.dest)
return sorted_args(self) == sorted_args(other)
def __hash__(self) -> int:
return hash(tuple(sorted(self._arguments, key=lambda a: a.dest)))
def __str__(self) -> str:
positional = sum(arg.positional for arg in self._arguments)

View File

@ -114,7 +114,7 @@ def get_arg_parsers(
help="Skip confirmation prompts",
)
run_group.add_argument(
run_parser.add_argument(
"command_args",
nargs=REMAINDER,
help="Arguments to pass to the command (if applicable)",

View File

@ -0,0 +1,74 @@
import inspect
from typing import Any, Callable
from falyx.logger import logger
def infer_args_from_func(
func: Callable[[Any], Any] | None,
arg_metadata: dict[str, str | dict[str, Any]] | None = None,
) -> list[dict[str, Any]]:
"""
Infer argument definitions from a callable's signature.
Returns a list of kwargs suitable for CommandArgumentParser.add_argument.
"""
if not callable(func):
logger.debug("Provided argument is not callable: %s", func)
return []
arg_metadata = arg_metadata or {}
signature = inspect.signature(func)
arg_defs = []
for name, param in signature.parameters.items():
raw_metadata = arg_metadata.get(name, {})
metadata = (
{"help": raw_metadata} if isinstance(raw_metadata, str) else raw_metadata
)
if param.kind not in (
inspect.Parameter.POSITIONAL_ONLY,
inspect.Parameter.POSITIONAL_OR_KEYWORD,
inspect.Parameter.KEYWORD_ONLY,
):
continue
arg_type = (
param.annotation if param.annotation is not inspect.Parameter.empty else str
)
default = param.default if param.default is not inspect.Parameter.empty else None
is_required = param.default is inspect.Parameter.empty
if is_required:
flags = [f"{name.replace('_', '-')}"]
else:
flags = [f"--{name.replace('_', '-')}"]
action = "store"
nargs: int | str = 1
if arg_type is bool:
if param.default is False:
action = "store_true"
else:
action = "store_false"
if arg_type is list:
action = "append"
if is_required:
nargs = "+"
else:
nargs = "*"
arg_defs.append(
{
"flags": flags,
"dest": name,
"type": arg_type,
"default": default,
"required": is_required,
"nargs": nargs,
"action": action,
"help": metadata.get("help", ""),
"choices": metadata.get("choices"),
}
)
return arg_defs

28
falyx/parsers/utils.py Normal file
View File

@ -0,0 +1,28 @@
from typing import Any
from falyx import logger
from falyx.parsers.signature import infer_args_from_func
def same_argument_definitions(
actions: list[Any],
arg_metadata: dict[str, str | dict[str, Any]] | None = None,
) -> list[dict[str, Any]] | None:
from falyx.action.action import BaseAction
arg_sets = []
for action in actions:
if isinstance(action, BaseAction):
infer_target, _ = action.get_infer_target()
arg_defs = infer_args_from_func(infer_target, arg_metadata)
elif callable(action):
arg_defs = infer_args_from_func(action, arg_metadata)
else:
logger.debug("Auto args unsupported for action: %s", action)
return None
arg_sets.append(arg_defs)
first = arg_sets[0]
if all(arg_set == first for arg_set in arg_sets[1:]):
return first
return None

View File

@ -10,7 +10,7 @@ from rich.markup import escape
from rich.table import Table
from falyx.themes import OneColors
from falyx.utils import chunks
from falyx.utils import CaseInsensitiveDict, chunks
from falyx.validators import int_range_validator, key_validator
@ -32,6 +32,62 @@ class SelectionOption:
return f"[{OneColors.WHITE}]{key}[/] [{self.style}]{self.description}[/]"
class SelectionOptionMap(CaseInsensitiveDict):
"""
Manages selection options including validation and reserved key protection.
"""
RESERVED_KEYS: set[str] = set()
def __init__(
self,
options: dict[str, SelectionOption] | None = None,
allow_reserved: bool = False,
):
super().__init__()
self.allow_reserved = allow_reserved
if options:
self.update(options)
def _add_reserved(self, key: str, option: SelectionOption) -> None:
"""Add a reserved key, bypassing validation."""
norm_key = key.upper()
super().__setitem__(norm_key, option)
def __setitem__(self, key: str, option: SelectionOption) -> None:
if not isinstance(option, SelectionOption):
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
norm_key = key.upper()
if norm_key in self.RESERVED_KEYS and not self.allow_reserved:
raise ValueError(
f"Key '{key}' is reserved and cannot be used in SelectionOptionMap."
)
super().__setitem__(norm_key, option)
def __delitem__(self, key: str) -> None:
if key.upper() in self.RESERVED_KEYS and not self.allow_reserved:
raise ValueError(f"Cannot delete reserved option '{key}'.")
super().__delitem__(key)
def update(self, other=None, **kwargs):
"""Update the selection options with another dictionary."""
if other:
for key, option in other.items():
if not isinstance(option, SelectionOption):
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
self[key] = option
for key, option in kwargs.items():
if not isinstance(option, SelectionOption):
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
self[key] = option
def items(self, include_reserved: bool = True):
for k, v in super().items():
if not include_reserved and k in self.RESERVED_KEYS:
continue
yield k, v
def render_table_base(
title: str,
*,

View File

@ -1 +1 @@
__version__ = "0.1.28"
__version__ = "0.1.33"

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "falyx"
version = "0.1.28"
version = "0.1.33"
description = "Reliable and introspectable async CLI action framework."
authors = ["Roland Thomas Jr <roland@rtj.dev>"]
license = "MIT"

View File

@ -56,102 +56,6 @@ def test_command_str():
)
@pytest.mark.parametrize(
"action_factory, expected_requires_input",
[
(lambda: Action(name="normal", action=dummy_action), False),
(lambda: DummyInputAction(name="io"), True),
(
lambda: ChainedAction(name="chain", actions=[DummyInputAction(name="io")]),
True,
),
(
lambda: ActionGroup(name="group", actions=[DummyInputAction(name="io")]),
True,
),
],
)
def test_command_requires_input_detection(action_factory, expected_requires_input):
action = action_factory()
cmd = Command(key="TEST", description="Test Command", action=action)
assert cmd.requires_input == expected_requires_input
if expected_requires_input:
assert cmd.hidden is True
else:
assert cmd.hidden is False
def test_requires_input_flag_detected_for_baseioaction():
"""Command should automatically detect requires_input=True for BaseIOAction."""
cmd = Command(
key="X",
description="Echo input",
action=DummyInputAction(name="dummy"),
)
assert cmd.requires_input is True
assert cmd.hidden is True
def test_requires_input_manual_override():
"""Command manually set requires_input=False should not auto-hide."""
cmd = Command(
key="Y",
description="Custom input command",
action=DummyInputAction(name="dummy"),
requires_input=False,
)
assert cmd.requires_input is False
assert cmd.hidden is False
def test_default_command_does_not_require_input():
"""Normal Command without IO Action should not require input."""
cmd = Command(
key="Z",
description="Simple action",
action=lambda: 42,
)
assert cmd.requires_input is False
assert cmd.hidden is False
def test_chain_requires_input():
"""If first action in a chain requires input, the command should require input."""
chain = ChainedAction(
name="ChainWithInput",
actions=[
DummyInputAction(name="dummy"),
Action(name="action1", action=lambda: 1),
],
)
cmd = Command(
key="A",
description="Chain with input",
action=chain,
)
assert cmd.requires_input is True
assert cmd.hidden is True
def test_group_requires_input():
"""If any action in a group requires input, the command should require input."""
group = ActionGroup(
name="GroupWithInput",
actions=[
Action(name="action1", action=lambda: 1),
DummyInputAction(name="dummy"),
],
)
cmd = Command(
key="B",
description="Group with input",
action=group,
)
assert cmd.requires_input is True
assert cmd.hidden is True
def test_enable_retry():
"""Command should enable retry if action is an Action and retry is set to True."""
cmd = Command(

View File

@ -1,7 +1,7 @@
import pytest
from falyx.argparse import ArgumentAction, CommandArgumentParser
from falyx.exceptions import CommandArgumentError
from falyx.parsers import ArgumentAction, CommandArgumentParser
from falyx.signals import HelpSignal