7 Commits

34 changed files with 884 additions and 488 deletions

View File

@ -6,7 +6,7 @@ from falyx.action import ActionFactoryAction, ChainedAction, HTTPAction, Selecti
# Selection of a post ID to fetch (just an example set)
post_selector = SelectionAction(
name="Pick Post ID",
selections=["1", "2", "3", "4", "5"],
selections=["15", "25", "35", "45", "55"],
title="Choose a Post ID to submit",
prompt_message="Post ID > ",
show_table=True,
@ -14,7 +14,7 @@ post_selector = SelectionAction(
# Factory that builds and executes the actual HTTP POST request
def build_post_action(post_id) -> HTTPAction:
async def build_post_action(post_id) -> HTTPAction:
print(f"Building HTTPAction for Post ID: {post_id}")
return HTTPAction(
name=f"POST to /posts (id={post_id})",

View File

@ -24,7 +24,6 @@ cmd = Command(
key="G",
description="Greet someone with multiple variations.",
action=group,
auto_args=True,
arg_metadata={
"name": {
"help": "The name of the person to greet.",

View File

@ -1,14 +1,18 @@
import asyncio
from falyx import Action, Falyx
from falyx import Action, ChainedAction, Falyx
from falyx.utils import setup_logging
setup_logging()
async def deploy(service: str, region: str = "us-east-1", verbose: bool = False):
async def deploy(service: str, region: str = "us-east-1", verbose: bool = False) -> str:
if verbose:
print(f"Deploying {service} to {region}...")
await asyncio.sleep(2)
if verbose:
print(f"{service} deployed successfully!")
return f"{service} deployed to {region}"
flx = Falyx("Deployment CLI")
@ -21,7 +25,6 @@ flx.add_command(
name="deploy_service",
action=deploy,
),
auto_args=True,
arg_metadata={
"service": "Service name",
"region": {"help": "Deployment region", "choices": ["us-east-1", "us-west-2"]},
@ -29,4 +32,23 @@ flx.add_command(
},
)
deploy_chain = ChainedAction(
name="DeployChain",
actions=[
Action(name="deploy_service", action=deploy),
Action(
name="notify",
action=lambda last_result: print(f"Notification: {last_result}"),
),
],
auto_inject=True,
)
flx.add_command(
key="N",
aliases=["notify"],
description="Deploy a service and notify.",
action=deploy_chain,
)
asyncio.run(flx.run())

View File

@ -4,7 +4,6 @@ from rich.console import Console
from falyx import ActionGroup, Falyx
from falyx.action import HTTPAction
from falyx.hook_manager import HookType
from falyx.hooks import ResultReporter
console = Console()
@ -49,7 +48,7 @@ action_group = ActionGroup(
reporter = ResultReporter()
action_group.hooks.register(
HookType.ON_SUCCESS,
"on_success",
reporter.report,
)

View File

@ -2,8 +2,16 @@ import asyncio
import time
from falyx import Falyx
from falyx.action import Action, ActionGroup, ChainedAction, MenuAction, ProcessAction
from falyx.action import (
Action,
ActionGroup,
ChainedAction,
MenuAction,
ProcessAction,
PromptMenuAction,
)
from falyx.menu import MenuOption, MenuOptionMap
from falyx.themes import OneColors
# Basic coroutine for Action
@ -77,20 +85,28 @@ parallel = ActionGroup(
process = ProcessAction(name="compute", action=heavy_computation)
menu_options = MenuOptionMap(
{
"A": MenuOption("Run basic Action", basic_action, style=OneColors.LIGHT_YELLOW),
"C": MenuOption("Run ChainedAction", chained, style=OneColors.MAGENTA),
"P": MenuOption("Run ActionGroup (parallel)", parallel, style=OneColors.CYAN),
"H": MenuOption("Run ProcessAction (heavy task)", process, style=OneColors.GREEN),
}
)
# Menu setup
menu = MenuAction(
name="main-menu",
title="Choose a task to run",
menu_options=MenuOptionMap(
{
"1": MenuOption("Run basic Action", basic_action),
"2": MenuOption("Run ChainedAction", chained),
"3": MenuOption("Run ActionGroup (parallel)", parallel),
"4": MenuOption("Run ProcessAction (heavy task)", process),
}
),
menu_options=menu_options,
)
prompt_menu = PromptMenuAction(
name="select-user",
menu_options=menu_options,
)
flx = Falyx(
@ -108,6 +124,13 @@ flx.add_command(
logging_hooks=True,
)
flx.add_command(
key="P",
description="Show Prompt Menu",
action=prompt_menu,
logging_hooks=True,
)
if __name__ == "__main__":
asyncio.run(flx.run())

View File

@ -3,7 +3,6 @@ import asyncio
from falyx import Action, ActionGroup, ChainedAction
from falyx import ExecutionRegistry as er
from falyx import ProcessAction
from falyx.hook_manager import HookType
from falyx.retry import RetryHandler, RetryPolicy
@ -47,7 +46,7 @@ def build_pipeline():
checkout = Action("Checkout", checkout_code)
analysis = ProcessAction("Static Analysis", run_static_analysis)
tests = Action("Run Tests", flaky_tests)
tests.hooks.register(HookType.ON_ERROR, retry_handler.retry_on_error)
tests.hooks.register("on_error", retry_handler.retry_on_error)
# Parallel deploys
deploy_group = ActionGroup(

View File

@ -1,22 +1,30 @@
import asyncio
from falyx.selection import (
SelectionOption,
prompt_for_selection,
render_selection_dict_table,
)
from falyx.action import SelectionAction
from falyx.selection import SelectionOption
from falyx.signals import CancelSignal
menu = {
"A": SelectionOption("Run diagnostics", lambda: print("Running diagnostics...")),
"B": SelectionOption("Deploy to staging", lambda: print("Deploying...")),
selections = {
"1": SelectionOption(
description="Production", value="3bc2616e-3696-11f0-a139-089204eb86ac"
),
"2": SelectionOption(
description="Staging", value="42f2cd84-3696-11f0-a139-089204eb86ac"
),
}
table = render_selection_dict_table(
title="Main Menu",
selections=menu,
select = SelectionAction(
name="Select Deployment",
selections=selections,
title="Select a Deployment",
columns=2,
prompt_message="> ",
return_type="value",
show_table=True,
)
key = asyncio.run(prompt_for_selection(menu.keys(), table))
print(f"You selected: {key}")
menu[key.upper()].value()
try:
print(asyncio.run(select()))
except CancelSignal:
print("Selection was cancelled.")

View File

@ -3,7 +3,6 @@ import asyncio
from falyx import Action, ChainedAction, Falyx
from falyx.action import ShellAction
from falyx.hook_manager import HookType
from falyx.hooks import ResultReporter
from falyx.utils import setup_logging
@ -42,12 +41,12 @@ reporter = ResultReporter()
a1 = Action("a1", a1, inject_last_result=True)
a1.hooks.register(
HookType.ON_SUCCESS,
"on_success",
reporter.report,
)
a2 = Action("a2", a2, inject_last_result=True)
a2.hooks.register(
HookType.ON_SUCCESS,
"on_success",
reporter.report,
)

View File

@ -12,7 +12,6 @@ from .command import Command
from .context import ExecutionContext, SharedContext
from .execution_registry import ExecutionRegistry
from .falyx import Falyx
from .hook_manager import HookType
logger = logging.getLogger("falyx")

View File

@ -18,6 +18,7 @@ from .action_factory import ActionFactoryAction
from .http_action import HTTPAction
from .io_action import BaseIOAction, ShellAction
from .menu_action import MenuAction
from .prompt_menu_action import PromptMenuAction
from .select_file_action import SelectFileAction
from .selection_action import SelectionAction
from .signal_action import SignalAction
@ -40,4 +41,5 @@ __all__ = [
"FallbackAction",
"LiteralInputAction",
"UserInputAction",
"PromptMenuAction",
]

View File

@ -47,6 +47,7 @@ from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import Hook, HookManager, HookType
from falyx.logger import logger
from falyx.options_manager import OptionsManager
from falyx.parsers.utils import same_argument_definitions
from falyx.retry import RetryHandler, RetryPolicy
from falyx.themes import OneColors
from falyx.utils import ensure_async
@ -61,8 +62,7 @@ class BaseAction(ABC):
inject_last_result (bool): Whether to inject the previous action's result
into kwargs.
inject_into (str): The name of the kwarg key to inject the result as
(default: 'last_result').
_requires_injection (bool): Whether the action requires input injection.
(default: 'last_result').
"""
def __init__(
@ -82,7 +82,6 @@ class BaseAction(ABC):
self.inject_last_result: bool = inject_last_result
self.inject_into: str = inject_into
self._never_prompt: bool = never_prompt
self._requires_injection: bool = False
self._skip_in_chain: bool = False
self.console = Console(color_system="auto")
self.options_manager: OptionsManager | None = None
@ -101,6 +100,14 @@ class BaseAction(ABC):
async def preview(self, parent: Tree | None = None):
raise NotImplementedError("preview must be implemented by subclasses")
@abstractmethod
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
"""
Returns the callable to be used for argument inference.
By default, it returns None.
"""
raise NotImplementedError("get_infer_target must be implemented by subclasses")
def set_options_manager(self, options_manager: OptionsManager) -> None:
self.options_manager = options_manager
@ -154,10 +161,6 @@ class BaseAction(ABC):
async def _write_stdout(self, data: str) -> None:
"""Override in subclasses that produce terminal output."""
def requires_io_injection(self) -> bool:
"""Checks to see if the action requires input injection."""
return self._requires_injection
def __repr__(self) -> str:
return str(self)
@ -246,6 +249,13 @@ class Action(BaseAction):
if policy.enabled:
self.enable_retry()
def get_infer_target(self) -> tuple[Callable[..., Any], None]:
"""
Returns the callable to be used for argument inference.
By default, it returns the action itself.
"""
return self.action, None
async def _run(self, *args, **kwargs) -> Any:
combined_args = args + self.args
combined_kwargs = self._maybe_inject_last_result({**self.kwargs, **kwargs})
@ -477,6 +487,14 @@ class ChainedAction(BaseAction, ActionListMixin):
if hasattr(action, "register_teardown") and callable(action.register_teardown):
action.register_teardown(self.hooks)
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
if self.actions:
return self.actions[0].get_infer_target()
return None, None
def _clear_args(self):
return (), {}
async def _run(self, *args, **kwargs) -> list[Any]:
if not self.actions:
raise EmptyChainError(f"[{self.name}] No actions to execute.")
@ -505,12 +523,8 @@ class ChainedAction(BaseAction, ActionListMixin):
continue
shared_context.current_index = index
prepared = action.prepare(shared_context, self.options_manager)
last_result = shared_context.last_result()
try:
if self.requires_io_injection() and last_result is not None:
result = await prepared(**{prepared.inject_into: last_result})
else:
result = await prepared(*args, **updated_kwargs)
result = await prepared(*args, **updated_kwargs)
except Exception as error:
if index + 1 < len(self.actions) and isinstance(
self.actions[index + 1], FallbackAction
@ -529,6 +543,7 @@ class ChainedAction(BaseAction, ActionListMixin):
fallback._skip_in_chain = True
else:
raise
args, updated_kwargs = self._clear_args()
shared_context.add_result(result)
context.extra["results"].append(result)
context.extra["rollback_stack"].append(prepared)
@ -669,6 +684,16 @@ class ActionGroup(BaseAction, ActionListMixin):
if hasattr(action, "register_teardown") and callable(action.register_teardown):
action.register_teardown(self.hooks)
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
arg_defs = same_argument_definitions(self.actions)
if arg_defs:
return self.actions[0].get_infer_target()
logger.debug(
"[%s] auto_args disabled: mismatched ActionGroup arguments",
self.name,
)
return None, None
async def _run(self, *args, **kwargs) -> list[tuple[str, Any]]:
shared_context = SharedContext(name=self.name, action=self, is_parallel=True)
if self.shared_context:
@ -701,7 +726,7 @@ class ActionGroup(BaseAction, ActionListMixin):
if context.extra["errors"]:
context.exception = Exception(
f"{len(context.extra['errors'])} action(s) failed: "
f"{' ,'.join(name for name, _ in context.extra["errors"])}"
f"{' ,'.join(name for name, _ in context.extra['errors'])}"
)
await self.hooks.trigger(HookType.ON_ERROR, context)
raise context.exception
@ -787,8 +812,11 @@ class ProcessAction(BaseAction):
self.executor = executor or ProcessPoolExecutor()
self.is_retryable = True
async def _run(self, *args, **kwargs):
if self.inject_last_result:
def get_infer_target(self) -> tuple[Callable[..., Any] | None, None]:
return self.action, None
async def _run(self, *args, **kwargs) -> Any:
if self.inject_last_result and self.shared_context:
last_result = self.shared_context.last_result()
if not self._validate_pickleable(last_result):
raise ValueError(

View File

@ -1,6 +1,6 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""action_factory.py"""
from typing import Any
from typing import Any, Callable
from rich.tree import Tree
@ -35,6 +35,8 @@ class ActionFactoryAction(BaseAction):
*,
inject_last_result: bool = False,
inject_into: str = "last_result",
args: tuple[Any, ...] = (),
kwargs: dict[str, Any] | None = None,
preview_args: tuple[Any, ...] = (),
preview_kwargs: dict[str, Any] | None = None,
):
@ -44,6 +46,8 @@ class ActionFactoryAction(BaseAction):
inject_into=inject_into,
)
self.factory = factory
self.args = args
self.kwargs = kwargs or {}
self.preview_args = preview_args
self.preview_kwargs = preview_kwargs or {}
@ -55,7 +59,12 @@ class ActionFactoryAction(BaseAction):
def factory(self, value: ActionFactoryProtocol):
self._factory = ensure_async(value)
def get_infer_target(self) -> tuple[Callable[..., Any], None]:
return self.factory, None
async def _run(self, *args, **kwargs) -> Any:
args = (*self.args, *args)
kwargs = {**self.kwargs, **kwargs}
updated_kwargs = self._maybe_inject_last_result(kwargs)
context = ExecutionContext(
name=f"{self.name} (factory)",
@ -85,7 +94,7 @@ class ActionFactoryAction(BaseAction):
)
if self.options_manager:
generated_action.set_options_manager(self.options_manager)
context.result = await generated_action(*args, **kwargs)
context.result = await generated_action()
await self.hooks.trigger(HookType.ON_SUCCESS, context)
return context.result
except Exception as error:

View File

@ -19,7 +19,7 @@ import asyncio
import shlex
import subprocess
import sys
from typing import Any
from typing import Any, Callable
from rich.tree import Tree
@ -73,7 +73,6 @@ class BaseIOAction(BaseAction):
inject_last_result=inject_last_result,
)
self.mode = mode
self._requires_injection = True
def from_input(self, raw: str | bytes) -> Any:
raise NotImplementedError
@ -81,15 +80,15 @@ class BaseIOAction(BaseAction):
def to_output(self, result: Any) -> str | bytes:
raise NotImplementedError
async def _resolve_input(self, kwargs: dict[str, Any]) -> str | bytes:
last_result = kwargs.pop(self.inject_into, None)
async def _resolve_input(
self, args: tuple[Any], kwargs: dict[str, Any]
) -> str | bytes:
data = await self._read_stdin()
if data:
return self.from_input(data)
if last_result is not None:
return last_result
if len(args) == 1:
return self.from_input(args[0])
if self.inject_last_result and self.shared_context:
return self.shared_context.last_result()
@ -99,6 +98,9 @@ class BaseIOAction(BaseAction):
)
raise FalyxError("No input provided and no last result to inject.")
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
return None, None
async def __call__(self, *args, **kwargs):
context = ExecutionContext(
name=self.name,
@ -117,8 +119,8 @@ class BaseIOAction(BaseAction):
pass
result = getattr(self, "_last_result", None)
else:
parsed_input = await self._resolve_input(kwargs)
result = await self._run(parsed_input, *args, **kwargs)
parsed_input = await self._resolve_input(args, kwargs)
result = await self._run(parsed_input)
output = self.to_output(result)
await self._write_stdout(output)
context.result = result
@ -195,7 +197,6 @@ class ShellAction(BaseIOAction):
- Captures stdout and stderr from shell execution
- Raises on non-zero exit codes with stderr as the error
- Result is returned as trimmed stdout string
- Compatible with ChainedAction and Command.requires_input detection
Args:
name (str): Name of the action.
@ -220,6 +221,11 @@ class ShellAction(BaseIOAction):
)
return raw.strip() if isinstance(raw, str) else raw.decode("utf-8").strip()
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
if sys.stdin.isatty():
return self._run, {"parsed_input": {"help": self.command_template}}
return None, None
async def _run(self, parsed_input: str) -> str:
# Replace placeholder in template, or use raw input as full command
command = self.command_template.format(parsed_input)

View File

@ -73,6 +73,9 @@ class MenuAction(BaseAction):
table.add_row(*row)
return table
def get_infer_target(self) -> tuple[None, None]:
return None, None
async def _run(self, *args, **kwargs) -> Any:
kwargs = self._maybe_inject_last_result(kwargs)
context = ExecutionContext(

View File

@ -0,0 +1,134 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""prompt_menu_action.py"""
from typing import Any
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import FormattedText, merge_formatted_text
from rich.console import Console
from rich.tree import Tree
from falyx.action.action import BaseAction
from falyx.context import ExecutionContext
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookType
from falyx.logger import logger
from falyx.menu import MenuOptionMap
from falyx.signals import BackSignal, QuitSignal
from falyx.themes import OneColors
class PromptMenuAction(BaseAction):
"""PromptMenuAction class for creating prompt -> actions."""
def __init__(
self,
name: str,
menu_options: MenuOptionMap,
*,
prompt_message: str = "Select > ",
default_selection: str = "",
inject_last_result: bool = False,
inject_into: str = "last_result",
console: Console | None = None,
prompt_session: PromptSession | None = None,
never_prompt: bool = False,
include_reserved: bool = True,
):
super().__init__(
name,
inject_last_result=inject_last_result,
inject_into=inject_into,
never_prompt=never_prompt,
)
self.menu_options = menu_options
self.prompt_message = prompt_message
self.default_selection = default_selection
self.console = console or Console(color_system="auto")
self.prompt_session = prompt_session or PromptSession()
self.include_reserved = include_reserved
def get_infer_target(self) -> tuple[None, None]:
return None, None
async def _run(self, *args, **kwargs) -> Any:
kwargs = self._maybe_inject_last_result(kwargs)
context = ExecutionContext(
name=self.name,
args=args,
kwargs=kwargs,
action=self,
)
effective_default = self.default_selection
maybe_result = str(self.last_result)
if maybe_result in self.menu_options:
effective_default = maybe_result
elif self.inject_last_result:
logger.warning(
"[%s] Injected last result '%s' not found in menu options",
self.name,
maybe_result,
)
if self.never_prompt and not effective_default:
raise ValueError(
f"[{self.name}] 'never_prompt' is True but no valid default_selection"
" was provided."
)
context.start_timer()
try:
await self.hooks.trigger(HookType.BEFORE, context)
key = effective_default
if not self.never_prompt:
placeholder_formatted_text = []
for index, (key, option) in enumerate(self.menu_options.items()):
placeholder_formatted_text.append(option.render_prompt(key))
if index < len(self.menu_options) - 1:
placeholder_formatted_text.append(
FormattedText([(OneColors.WHITE, " | ")])
)
placeholder = merge_formatted_text(placeholder_formatted_text)
key = await self.prompt_session.prompt_async(
message=self.prompt_message, placeholder=placeholder
)
option = self.menu_options[key]
result = await option.action(*args, **kwargs)
context.result = result
await self.hooks.trigger(HookType.ON_SUCCESS, context)
return result
except BackSignal:
logger.debug("[%s][BackSignal] ← Returning to previous menu", self.name)
return None
except QuitSignal:
logger.debug("[%s][QuitSignal] ← Exiting application", self.name)
raise
except Exception as error:
context.exception = error
await self.hooks.trigger(HookType.ON_ERROR, context)
raise
finally:
context.stop_timer()
await self.hooks.trigger(HookType.AFTER, context)
await self.hooks.trigger(HookType.ON_TEARDOWN, context)
er.record(context)
async def preview(self, parent: Tree | None = None):
label = f"[{OneColors.LIGHT_YELLOW_b}]📋 PromptMenuAction[/] '{self.name}'"
tree = parent.add(label) if parent else Tree(label)
for key, option in self.menu_options.items():
tree.add(
f"[dim]{key}[/]: {option.description} → [italic]{option.action.name}[/]"
)
await option.action.preview(parent=tree)
if not parent:
self.console.print(tree)
def __str__(self) -> str:
return (
f"PromptMenuAction(name={self.name!r}, options={list(self.menu_options.keys())!r}, "
f"default_selection={self.default_selection!r}, "
f"include_reserved={self.include_reserved}, "
f"prompt={'off' if self.never_prompt else 'on'})"
)

View File

@ -25,6 +25,7 @@ from falyx.selection import (
prompt_for_selection,
render_selection_dict_table,
)
from falyx.signals import CancelSignal
from falyx.themes import OneColors
@ -121,6 +122,16 @@ class SelectFileAction(BaseAction):
logger.warning("[ERROR] Failed to parse %s: %s", file.name, error)
return options
def _find_cancel_key(self, options) -> str:
"""Return first numeric value not already used in the selection dict."""
for index in range(len(options)):
if str(index) not in options:
return str(index)
return str(len(options))
def get_infer_target(self) -> tuple[None, None]:
return None, None
async def _run(self, *args, **kwargs) -> Any:
context = ExecutionContext(name=self.name, args=args, kwargs=kwargs, action=self)
context.start_timer()
@ -128,28 +139,38 @@ class SelectFileAction(BaseAction):
await self.hooks.trigger(HookType.BEFORE, context)
files = [
f
for f in self.directory.iterdir()
if f.is_file()
and (self.suffix_filter is None or f.suffix == self.suffix_filter)
file
for file in self.directory.iterdir()
if file.is_file()
and (self.suffix_filter is None or file.suffix == self.suffix_filter)
]
if not files:
raise FileNotFoundError("No files found in directory.")
options = self.get_options(files)
cancel_key = self._find_cancel_key(options)
cancel_option = {
cancel_key: SelectionOption(
description="Cancel", value=CancelSignal(), style=OneColors.DARK_RED
)
}
table = render_selection_dict_table(
title=self.title, selections=options, columns=self.columns
title=self.title, selections=options | cancel_option, columns=self.columns
)
key = await prompt_for_selection(
options.keys(),
(options | cancel_option).keys(),
table,
console=self.console,
prompt_session=self.prompt_session,
prompt_message=self.prompt_message,
)
if key == cancel_key:
raise CancelSignal("User canceled the selection.")
result = options[key].value
context.result = result
await self.hooks.trigger(HookType.ON_SUCCESS, context)
@ -176,11 +197,11 @@ class SelectFileAction(BaseAction):
try:
files = list(self.directory.iterdir())
if self.suffix_filter:
files = [f for f in files if f.suffix == self.suffix_filter]
files = [file for file in files if file.suffix == self.suffix_filter]
sample = files[:10]
file_list = tree.add("[dim]Files:[/]")
for f in sample:
file_list.add(f"[dim]{f.name}[/]")
for file in sample:
file_list.add(f"[dim]{file.name}[/]")
if len(files) > 10:
file_list.add(f"[dim]... ({len(files) - 10} more)[/]")
except Exception as error:

View File

@ -1,5 +1,6 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""selection_action.py"""
from copy import copy
from typing import Any
from prompt_toolkit import PromptSession
@ -7,19 +8,21 @@ from rich.console import Console
from rich.tree import Tree
from falyx.action.action import BaseAction
from falyx.action.types import SelectionReturnType
from falyx.context import ExecutionContext
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookType
from falyx.logger import logger
from falyx.selection import (
SelectionOption,
SelectionOptionMap,
prompt_for_index,
prompt_for_selection,
render_selection_dict_table,
render_selection_indexed_table,
)
from falyx.signals import CancelSignal
from falyx.themes import OneColors
from falyx.utils import CaseInsensitiveDict
class SelectionAction(BaseAction):
@ -34,7 +37,13 @@ class SelectionAction(BaseAction):
def __init__(
self,
name: str,
selections: list[str] | set[str] | tuple[str, ...] | dict[str, SelectionOption],
selections: (
list[str]
| set[str]
| tuple[str, ...]
| dict[str, SelectionOption]
| dict[str, Any]
),
*,
title: str = "Select an option",
columns: int = 5,
@ -42,7 +51,7 @@ class SelectionAction(BaseAction):
default_selection: str = "",
inject_last_result: bool = False,
inject_into: str = "last_result",
return_key: bool = False,
return_type: SelectionReturnType | str = "value",
console: Console | None = None,
prompt_session: PromptSession | None = None,
never_prompt: bool = False,
@ -55,8 +64,8 @@ class SelectionAction(BaseAction):
never_prompt=never_prompt,
)
# Setter normalizes to correct type, mypy can't infer that
self.selections: list[str] | CaseInsensitiveDict = selections # type: ignore[assignment]
self.return_key = return_key
self.selections: list[str] | SelectionOptionMap = selections # type: ignore[assignment]
self.return_type: SelectionReturnType = self._coerce_return_type(return_type)
self.title = title
self.columns = columns
self.console = console or Console(color_system="auto")
@ -64,9 +73,17 @@ class SelectionAction(BaseAction):
self.default_selection = default_selection
self.prompt_message = prompt_message
self.show_table = show_table
self.cancel_key = self._find_cancel_key()
def _coerce_return_type(
self, return_type: SelectionReturnType | str
) -> SelectionReturnType:
if isinstance(return_type, SelectionReturnType):
return return_type
return SelectionReturnType(return_type)
@property
def selections(self) -> list[str] | CaseInsensitiveDict:
def selections(self) -> list[str] | SelectionOptionMap:
return self._selections
@selections.setter
@ -74,17 +91,69 @@ class SelectionAction(BaseAction):
self, value: list[str] | set[str] | tuple[str, ...] | dict[str, SelectionOption]
):
if isinstance(value, (list, tuple, set)):
self._selections: list[str] | CaseInsensitiveDict = list(value)
self._selections: list[str] | SelectionOptionMap = list(value)
elif isinstance(value, dict):
cid = CaseInsensitiveDict()
cid.update(value)
self._selections = cid
som = SelectionOptionMap()
if all(isinstance(key, str) for key in value) and all(
not isinstance(value[key], SelectionOption) for key in value
):
som.update(
{
str(index): SelectionOption(key, option)
for index, (key, option) in enumerate(value.items())
}
)
elif all(isinstance(key, str) for key in value) and all(
isinstance(value[key], SelectionOption) for key in value
):
som.update(value)
else:
raise ValueError("Invalid dictionary format. Keys must be strings")
self._selections = som
else:
raise TypeError(
"'selections' must be a list[str] or dict[str, SelectionOption], "
f"got {type(value).__name__}"
)
def _find_cancel_key(self) -> str:
"""Find the cancel key in the selections."""
if isinstance(self.selections, dict):
for index in range(len(self.selections) + 1):
if str(index) not in self.selections:
return str(index)
return str(len(self.selections))
@property
def cancel_key(self) -> str:
return self._cancel_key
@cancel_key.setter
def cancel_key(self, value: str) -> None:
"""Set the cancel key for the selection."""
if not isinstance(value, str):
raise TypeError("Cancel key must be a string.")
if isinstance(self.selections, dict) and value in self.selections:
raise ValueError(
"Cancel key cannot be one of the selection keys. "
f"Current selections: {self.selections}"
)
if isinstance(self.selections, list):
if not value.isdigit() or int(value) > len(self.selections):
raise ValueError(
"cancel_key must be a digit and not greater than the number of selections."
)
self._cancel_key = value
def cancel_formatter(self, index: int, selection: str) -> str:
"""Format the cancel option for display."""
if self.cancel_key == str(index):
return f"[{index}] [{OneColors.DARK_RED}]Cancel[/]"
return f"[{index}] {selection}"
def get_infer_target(self) -> tuple[None, None]:
return None, None
async def _run(self, *args, **kwargs) -> Any:
kwargs = self._maybe_inject_last_result(kwargs)
context = ExecutionContext(
@ -125,16 +194,18 @@ class SelectionAction(BaseAction):
context.start_timer()
try:
self.cancel_key = self._find_cancel_key()
await self.hooks.trigger(HookType.BEFORE, context)
if isinstance(self.selections, list):
table = render_selection_indexed_table(
title=self.title,
selections=self.selections,
selections=self.selections + ["Cancel"],
columns=self.columns,
formatter=self.cancel_formatter,
)
if not self.never_prompt:
index = await prompt_for_index(
len(self.selections) - 1,
index: int | str = await prompt_for_index(
len(self.selections),
table,
default_selection=effective_default,
console=self.console,
@ -144,14 +215,23 @@ class SelectionAction(BaseAction):
)
else:
index = effective_default
result = self.selections[int(index)]
if int(index) == int(self.cancel_key):
raise CancelSignal("User cancelled the selection.")
result: Any = self.selections[int(index)]
elif isinstance(self.selections, dict):
cancel_option = {
self.cancel_key: SelectionOption(
description="Cancel", value=CancelSignal, style=OneColors.DARK_RED
)
}
table = render_selection_dict_table(
title=self.title, selections=self.selections, columns=self.columns
title=self.title,
selections=self.selections | cancel_option,
columns=self.columns,
)
if not self.never_prompt:
key = await prompt_for_selection(
self.selections.keys(),
(self.selections | cancel_option).keys(),
table,
default_selection=effective_default,
console=self.console,
@ -161,10 +241,25 @@ class SelectionAction(BaseAction):
)
else:
key = effective_default
result = key if self.return_key else self.selections[key].value
if key == self.cancel_key:
raise CancelSignal("User cancelled the selection.")
if self.return_type == SelectionReturnType.KEY:
result = key
elif self.return_type == SelectionReturnType.VALUE:
result = self.selections[key].value
elif self.return_type == SelectionReturnType.ITEMS:
result = {key: self.selections[key]}
elif self.return_type == SelectionReturnType.DESCRIPTION:
result = self.selections[key].description
elif self.return_type == SelectionReturnType.DESCRIPTION_VALUE:
result = {
self.selections[key].description: self.selections[key].value
}
else:
raise ValueError(f"Unsupported return type: {self.return_type}")
else:
raise TypeError(
"'selections' must be a list[str] or dict[str, tuple[str, Any]], "
"'selections' must be a list[str] or dict[str, Any], "
f"got {type(self.selections).__name__}"
)
context.result = result
@ -203,7 +298,7 @@ class SelectionAction(BaseAction):
return
tree.add(f"[dim]Default:[/] '{self.default_selection or self.last_result}'")
tree.add(f"[dim]Return:[/] {'Key' if self.return_key else 'Value'}")
tree.add(f"[dim]Return:[/] {self.return_type.name.capitalize()}")
tree.add(f"[dim]Prompt:[/] {'Disabled' if self.never_prompt else 'Enabled'}")
if not parent:
@ -218,6 +313,6 @@ class SelectionAction(BaseAction):
return (
f"SelectionAction(name={self.name!r}, type={selection_type}, "
f"default_selection={self.default_selection!r}, "
f"return_key={self.return_key}, "
f"return_type={self.return_type!r}, "
f"prompt={'off' if self.never_prompt else 'on'})"
)

View File

@ -35,3 +35,18 @@ class FileReturnType(Enum):
return member
valid = ", ".join(member.value for member in cls)
raise ValueError(f"Invalid FileReturnType: '{value}'. Must be one of: {valid}")
class SelectionReturnType(Enum):
"""Enum for dictionary return types."""
KEY = "key"
VALUE = "value"
DESCRIPTION = "description"
DESCRIPTION_VALUE = "description_value"
ITEMS = "items"
@classmethod
def _missing_(cls, value: object) -> SelectionReturnType:
valid = ", ".join(member.value for member in cls)
raise ValueError(f"Invalid DictReturnType: '{value}'. Must be one of: {valid}")

View File

@ -43,6 +43,9 @@ class UserInputAction(BaseAction):
self.console = console or Console(color_system="auto")
self.prompt_session = prompt_session or PromptSession()
def get_infer_target(self) -> tuple[None, None]:
return None, None
async def _run(self, *args, **kwargs) -> str:
context = ExecutionContext(
name=self.name,

View File

@ -19,7 +19,6 @@ in building robust interactive menus.
from __future__ import annotations
import shlex
from functools import cached_property
from typing import Any, Callable
from prompt_toolkit.formatted_text import FormattedText
@ -27,25 +26,15 @@ from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator
from rich.console import Console
from rich.tree import Tree
from falyx.action.action import (
Action,
ActionGroup,
BaseAction,
ChainedAction,
ProcessAction,
)
from falyx.action.io_action import BaseIOAction
from falyx.action.action import Action, BaseAction
from falyx.context import ExecutionContext
from falyx.debug import register_debug_hooks
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.logger import logger
from falyx.options_manager import OptionsManager
from falyx.parsers import (
CommandArgumentParser,
infer_args_from_func,
same_argument_definitions,
)
from falyx.parsers.argparse import CommandArgumentParser
from falyx.parsers.signature import infer_args_from_func
from falyx.prompt_utils import confirm_async, should_prompt_user
from falyx.protocols import ArgParserProtocol
from falyx.retry import RetryPolicy
@ -99,7 +88,6 @@ class Command(BaseModel):
retry_policy (RetryPolicy): Retry behavior configuration.
tags (list[str]): Organizational tags for the command.
logging_hooks (bool): Whether to attach logging hooks automatically.
requires_input (bool | None): Indicates if the action needs input.
options_manager (OptionsManager): Manages global command-line options.
arg_parser (CommandArgumentParser): Parses command arguments.
custom_parser (ArgParserProtocol | None): Custom argument parser.
@ -116,7 +104,7 @@ class Command(BaseModel):
key: str
description: str
action: BaseAction | Callable[[Any], Any]
action: BaseAction | Callable[..., Any]
args: tuple = ()
kwargs: dict[str, Any] = Field(default_factory=dict)
hidden: bool = False
@ -138,24 +126,23 @@ class Command(BaseModel):
retry_policy: RetryPolicy = Field(default_factory=RetryPolicy)
tags: list[str] = Field(default_factory=list)
logging_hooks: bool = False
requires_input: bool | None = None
options_manager: OptionsManager = Field(default_factory=OptionsManager)
arg_parser: CommandArgumentParser = Field(default_factory=CommandArgumentParser)
arguments: list[dict[str, Any]] = Field(default_factory=list)
argument_config: Callable[[CommandArgumentParser], None] | None = None
custom_parser: ArgParserProtocol | None = None
custom_help: Callable[[], str | None] | None = None
auto_args: bool = False
auto_args: bool = True
arg_metadata: dict[str, str | dict[str, Any]] = Field(default_factory=dict)
_context: ExecutionContext | None = PrivateAttr(default=None)
model_config = ConfigDict(arbitrary_types_allowed=True)
def parse_args(
async def parse_args(
self, raw_args: list[str] | str, from_validate: bool = False
) -> tuple[tuple, dict]:
if self.custom_parser:
if callable(self.custom_parser):
if isinstance(raw_args, str):
try:
raw_args = shlex.split(raw_args)
@ -178,7 +165,9 @@ class Command(BaseModel):
raw_args,
)
return ((), {})
return self.arg_parser.parse_args_split(raw_args, from_validate=from_validate)
return await self.arg_parser.parse_args_split(
raw_args, from_validate=from_validate
)
@field_validator("action", mode="before")
@classmethod
@ -192,28 +181,15 @@ class Command(BaseModel):
def get_argument_definitions(self) -> list[dict[str, Any]]:
if self.arguments:
return self.arguments
elif self.argument_config:
elif callable(self.argument_config):
self.argument_config(self.arg_parser)
elif self.auto_args:
if isinstance(self.action, (Action, ProcessAction)):
return infer_args_from_func(self.action.action, self.arg_metadata)
elif isinstance(self.action, ChainedAction):
if self.action.actions:
action = self.action.actions[0]
if isinstance(action, Action):
return infer_args_from_func(action.action, self.arg_metadata)
elif callable(action):
return infer_args_from_func(action, self.arg_metadata)
elif isinstance(self.action, ActionGroup):
arg_defs = same_argument_definitions(
self.action.actions, self.arg_metadata
)
if arg_defs:
return arg_defs
logger.debug(
"[Command:%s] auto_args disabled: mismatched ActionGroup arguments",
self.key,
)
if isinstance(self.action, BaseAction):
infer_target, maybe_metadata = self.action.get_infer_target()
# merge metadata with the action's metadata if not already in self.arg_metadata
if maybe_metadata:
self.arg_metadata = {**maybe_metadata, **self.arg_metadata}
return infer_args_from_func(infer_target, self.arg_metadata)
elif callable(self.action):
return infer_args_from_func(self.action, self.arg_metadata)
return []
@ -241,30 +217,9 @@ class Command(BaseModel):
if self.logging_hooks and isinstance(self.action, BaseAction):
register_debug_hooks(self.action.hooks)
if self.requires_input is None and self.detect_requires_input:
self.requires_input = True
self.hidden = True
elif self.requires_input is None:
self.requires_input = False
for arg_def in self.get_argument_definitions():
self.arg_parser.add_argument(*arg_def.pop("flags"), **arg_def)
@cached_property
def detect_requires_input(self) -> bool:
"""Detect if the action requires input based on its type."""
if isinstance(self.action, BaseIOAction):
return True
elif isinstance(self.action, ChainedAction):
return (
isinstance(self.action.actions[0], BaseIOAction)
if self.action.actions
else False
)
elif isinstance(self.action, ActionGroup):
return any(isinstance(action, BaseIOAction) for action in self.action.actions)
return False
def _inject_options_manager(self) -> None:
"""Inject the options manager into the action if applicable."""
if isinstance(self.action, BaseAction):
@ -357,7 +312,7 @@ class Command(BaseModel):
def show_help(self) -> bool:
"""Display the help message for the command."""
if self.custom_help:
if callable(self.custom_help):
output = self.custom_help()
if output:
console.print(output)

View File

@ -98,7 +98,6 @@ class RawCommand(BaseModel):
retry: bool = False
retry_all: bool = False
retry_policy: RetryPolicy = Field(default_factory=RetryPolicy)
requires_input: bool | None = None
hidden: bool = False
help_text: str = ""

View File

@ -61,9 +61,9 @@ from falyx.options_manager import OptionsManager
from falyx.parsers import CommandArgumentParser, get_arg_parsers
from falyx.protocols import ArgParserProtocol
from falyx.retry import RetryPolicy
from falyx.signals import BackSignal, CancelSignal, FlowSignal, HelpSignal, QuitSignal
from falyx.signals import BackSignal, CancelSignal, HelpSignal, QuitSignal
from falyx.themes import OneColors, get_nord_theme
from falyx.utils import CaseInsensitiveDict, _noop, chunks, get_program_invocation
from falyx.utils import CaseInsensitiveDict, _noop, chunks
from falyx.version import __version__
@ -83,14 +83,17 @@ class CommandValidator(Validator):
self.error_message = error_message
def validate(self, document) -> None:
pass
async def validate_async(self, document) -> None:
text = document.text
is_preview, choice, _, __ = self.falyx.get_command(text, from_validate=True)
is_preview, choice, _, __ = await self.falyx.get_command(text, from_validate=True)
if is_preview:
return None
if not choice:
raise ValidationError(
message=self.error_message,
cursor_position=document.get_end_of_document_position(),
cursor_position=len(text),
)
@ -111,6 +114,8 @@ class Falyx:
- Submenu nesting and action chaining
- History tracking, help generation, and run key execution modes
- Seamless CLI argument parsing and integration via argparse
- Declarative option management with OptionsManager
- Command level argument parsing and validation
- Extensible with user-defined hooks, bottom bars, and custom layouts
Args:
@ -126,7 +131,7 @@ class Falyx:
never_prompt (bool): Seed default for `OptionsManager["never_prompt"]`
force_confirm (bool): Seed default for `OptionsManager["force_confirm"]`
cli_args (Namespace | None): Parsed CLI arguments, usually from argparse.
options (OptionsManager | None): Declarative option mappings.
options (OptionsManager | None): Declarative option mappings for global state.
custom_table (Callable[[Falyx], Table] | Table | None): Custom menu table
generator.
@ -158,8 +163,9 @@ class Falyx:
force_confirm: bool = False,
cli_args: Namespace | None = None,
options: OptionsManager | None = None,
render_menu: Callable[["Falyx"], None] | None = None,
custom_table: Callable[["Falyx"], Table] | Table | None = None,
render_menu: Callable[[Falyx], None] | None = None,
custom_table: Callable[[Falyx], Table] | Table | None = None,
hide_menu_table: bool = False,
) -> None:
"""Initializes the Falyx object."""
self.title: str | Markdown = title
@ -183,8 +189,9 @@ class Falyx:
self._never_prompt: bool = never_prompt
self._force_confirm: bool = force_confirm
self.cli_args: Namespace | None = cli_args
self.render_menu: Callable[["Falyx"], None] | None = render_menu
self.custom_table: Callable[["Falyx"], Table] | Table | None = custom_table
self.render_menu: Callable[[Falyx], None] | None = render_menu
self.custom_table: Callable[[Falyx], Table] | Table | None = custom_table
self._hide_menu_table: bool = hide_menu_table
self.validate_options(cli_args, options)
self._prompt_session: PromptSession | None = None
self.mode = FalyxMode.MENU
@ -287,8 +294,6 @@ class Falyx:
for command in self.commands.values():
help_text = command.help_text or command.description
if command.requires_input:
help_text += " [dim](requires input)[/dim]"
table.add_row(
f"[{command.style}]{command.key}[/]",
", ".join(command.aliases) if command.aliases else "",
@ -445,7 +450,6 @@ class Falyx:
bottom_toolbar=self._get_bottom_bar_render(),
key_bindings=self.key_bindings,
validate_while_typing=False,
interrupt_exception=FlowSignal,
)
return self._prompt_session
@ -526,7 +530,7 @@ class Falyx:
key: str = "X",
description: str = "Exit",
aliases: list[str] | None = None,
action: Callable[[Any], Any] | None = None,
action: Callable[..., Any] | None = None,
style: str = OneColors.DARK_RED,
confirm: bool = False,
confirm_message: str = "Are you sure?",
@ -580,7 +584,7 @@ class Falyx:
self,
key: str,
description: str,
action: BaseAction | Callable[[Any], Any],
action: BaseAction | Callable[..., Any],
*,
args: tuple = (),
kwargs: dict[str, Any] | None = None,
@ -608,13 +612,12 @@ class Falyx:
retry: bool = False,
retry_all: bool = False,
retry_policy: RetryPolicy | None = None,
requires_input: bool | None = None,
arg_parser: CommandArgumentParser | None = None,
arguments: list[dict[str, Any]] | None = None,
argument_config: Callable[[CommandArgumentParser], None] | None = None,
custom_parser: ArgParserProtocol | None = None,
custom_help: Callable[[], str | None] | None = None,
auto_args: bool = False,
auto_args: bool = True,
arg_metadata: dict[str, str | dict[str, Any]] | None = None,
) -> Command:
"""Adds an command to the menu, preventing duplicates."""
@ -660,7 +663,6 @@ class Falyx:
retry=retry,
retry_all=retry_all,
retry_policy=retry_policy or RetryPolicy(),
requires_input=requires_input,
options_manager=self.options,
arg_parser=arg_parser,
arguments=arguments or [],
@ -741,7 +743,7 @@ class Falyx:
return True, input_str[1:].strip()
return False, input_str.strip()
def get_command(
async def get_command(
self, raw_choices: str, from_validate=False
) -> tuple[bool, Command | None, tuple, dict[str, Any]]:
"""
@ -768,26 +770,29 @@ class Falyx:
choice = choice.upper()
name_map = self._name_map
if choice in name_map:
if name_map.get(choice):
if not from_validate:
logger.info("Command '%s' selected.", choice)
if input_args and name_map[choice].arg_parser:
try:
args, kwargs = name_map[choice].parse_args(input_args, from_validate)
except CommandArgumentError as error:
if not from_validate:
if not name_map[choice].show_help():
self.console.print(
f"[{OneColors.DARK_RED}]❌ Invalid arguments for '{choice}': {error}"
)
else:
name_map[choice].show_help()
raise ValidationError(
message=str(error), cursor_position=len(raw_choices)
if is_preview:
return True, name_map[choice], args, kwargs
try:
args, kwargs = await name_map[choice].parse_args(
input_args, from_validate
)
except CommandArgumentError as error:
if not from_validate:
if not name_map[choice].show_help():
self.console.print(
f"[{OneColors.DARK_RED}]❌ Invalid arguments for '{choice}': {error}"
)
return is_preview, None, args, kwargs
except HelpSignal:
return True, None, args, kwargs
else:
name_map[choice].show_help()
raise ValidationError(
message=str(error), cursor_position=len(raw_choices)
)
return is_preview, None, args, kwargs
except HelpSignal:
return True, None, args, kwargs
return is_preview, name_map[choice], args, kwargs
prefix_matches = [cmd for key, cmd in name_map.items() if key.startswith(choice)]
@ -834,7 +839,7 @@ class Falyx:
"""Processes the action of the selected command."""
with patch_stdout(raw=True):
choice = await self.prompt_session.prompt_async()
is_preview, selected_command, args, kwargs = self.get_command(choice)
is_preview, selected_command, args, kwargs = await self.get_command(choice)
if not selected_command:
logger.info("Invalid command '%s'.", choice)
return True
@ -844,15 +849,6 @@ class Falyx:
await selected_command.preview()
return True
if selected_command.requires_input:
program = get_program_invocation()
self.console.print(
f"[{OneColors.LIGHT_YELLOW}]⚠️ Command '{selected_command.key}' requires"
f" input and must be run via [{OneColors.MAGENTA}]'{program} run"
f"'[{OneColors.LIGHT_YELLOW}] with proper piping or arguments.[/]"
)
return True
self.last_run_command = selected_command
if selected_command == self.exit_command:
@ -885,7 +881,7 @@ class Falyx:
) -> Any:
"""Run a command by key without displaying the menu (non-interactive mode)."""
self.debug_hooks()
is_preview, selected_command, _, __ = self.get_command(command_key)
is_preview, selected_command, _, __ = await self.get_command(command_key)
kwargs = kwargs or {}
self.last_run_command = selected_command
@ -984,10 +980,11 @@ class Falyx:
self.print_message(self.welcome_message)
try:
while True:
if callable(self.render_menu):
self.render_menu(self)
else:
self.console.print(self.table, justify="center")
if not self.options.get("hide_menu_table", self._hide_menu_table):
if callable(self.render_menu):
self.render_menu(self)
else:
self.console.print(self.table, justify="center")
try:
task = asyncio.create_task(self.process_command())
should_continue = await task
@ -1020,6 +1017,9 @@ class Falyx:
if not self.options.get("force_confirm"):
self.options.set("force_confirm", self._force_confirm)
if not self.options.get("hide_menu_table"):
self.options.set("hide_menu_table", self._hide_menu_table)
if self.cli_args.verbose:
logging.getLogger("falyx").setLevel(logging.DEBUG)
@ -1037,7 +1037,7 @@ class Falyx:
if self.cli_args.command == "preview":
self.mode = FalyxMode.PREVIEW
_, command, args, kwargs = self.get_command(self.cli_args.name)
_, command, args, kwargs = await self.get_command(self.cli_args.name)
if not command:
self.console.print(
f"[{OneColors.DARK_RED}]❌ Command '{self.cli_args.name}' not found."
@ -1051,7 +1051,7 @@ class Falyx:
if self.cli_args.command == "run":
self.mode = FalyxMode.RUN
is_preview, command, _, __ = self.get_command(self.cli_args.name)
is_preview, command, _, __ = await self.get_command(self.cli_args.name)
if is_preview:
if command is None:
sys.exit(1)
@ -1062,7 +1062,7 @@ class Falyx:
sys.exit(1)
self._set_retry_policy(command)
try:
args, kwargs = command.parse_args(self.cli_args.command_args)
args, kwargs = await command.parse_args(self.cli_args.command_args)
except HelpSignal:
sys.exit(0)
try:

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import inspect
from enum import Enum
from typing import Awaitable, Callable, Dict, List, Optional, Union
from typing import Awaitable, Callable, Union
from falyx.context import ExecutionContext
from falyx.logger import logger
@ -24,7 +24,7 @@ class HookType(Enum):
ON_TEARDOWN = "on_teardown"
@classmethod
def choices(cls) -> List[HookType]:
def choices(cls) -> list[HookType]:
"""Return a list of all hook type choices."""
return list(cls)
@ -37,16 +37,17 @@ class HookManager:
"""HookManager"""
def __init__(self) -> None:
self._hooks: Dict[HookType, List[Hook]] = {
self._hooks: dict[HookType, list[Hook]] = {
hook_type: [] for hook_type in HookType
}
def register(self, hook_type: HookType, hook: Hook):
if hook_type not in HookType:
raise ValueError(f"Unsupported hook type: {hook_type}")
def register(self, hook_type: HookType | str, hook: Hook):
"""Raises ValueError if the hook type is not supported."""
if not isinstance(hook_type, HookType):
hook_type = HookType(hook_type)
self._hooks[hook_type].append(hook)
def clear(self, hook_type: Optional[HookType] = None):
def clear(self, hook_type: HookType | None = None):
if hook_type:
self._hooks[hook_type] = []
else:

View File

@ -2,6 +2,8 @@ from __future__ import annotations
from dataclasses import dataclass
from prompt_toolkit.formatted_text import FormattedText
from falyx.action import BaseAction
from falyx.signals import BackSignal, QuitSignal
from falyx.themes import OneColors
@ -26,6 +28,12 @@ class MenuOption:
"""Render the menu option for display."""
return f"[{OneColors.WHITE}][{key}][/] [{self.style}]{self.description}[/]"
def render_prompt(self, key: str) -> FormattedText:
"""Render the menu option for prompt display."""
return FormattedText(
[(OneColors.WHITE, f"[{key}] "), (self.style, self.description)]
)
class MenuOptionMap(CaseInsensitiveDict):
"""
@ -33,7 +41,7 @@ class MenuOptionMap(CaseInsensitiveDict):
and special signal entries like Quit and Back.
"""
RESERVED_KEYS = {"Q", "B"}
RESERVED_KEYS = {"B", "X"}
def __init__(
self,
@ -49,14 +57,14 @@ class MenuOptionMap(CaseInsensitiveDict):
def _inject_reserved_defaults(self):
from falyx.action import SignalAction
self._add_reserved(
"Q",
MenuOption("Exit", SignalAction("Quit", QuitSignal()), OneColors.DARK_RED),
)
self._add_reserved(
"B",
MenuOption("Back", SignalAction("Back", BackSignal()), OneColors.DARK_YELLOW),
)
self._add_reserved(
"X",
MenuOption("Exit", SignalAction("Quit", QuitSignal()), OneColors.DARK_RED),
)
def _add_reserved(self, key: str, option: MenuOption) -> None:
"""Add a reserved key, bypassing validation."""
@ -78,8 +86,20 @@ class MenuOptionMap(CaseInsensitiveDict):
raise ValueError(f"Cannot delete reserved option '{key}'.")
super().__delitem__(key)
def update(self, other=None, **kwargs):
"""Update the selection options with another dictionary."""
if other:
for key, option in other.items():
if not isinstance(option, MenuOption):
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
self[key] = option
for key, option in kwargs.items():
if not isinstance(option, MenuOption):
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
self[key] = option
def items(self, include_reserved: bool = True):
for k, v in super().items():
if not include_reserved and k in self.RESERVED_KEYS:
for key, option in super().items():
if not include_reserved and key in self.RESERVED_KEYS:
continue
yield k, v
yield key, option

View File

@ -7,8 +7,6 @@ Licensed under the MIT License. See LICENSE file for details.
from .argparse import Argument, ArgumentAction, CommandArgumentParser
from .parsers import FalyxParsers, get_arg_parsers
from .signature import infer_args_from_func
from .utils import same_argument_definitions
__all__ = [
"Argument",
@ -16,6 +14,4 @@ __all__ = [
"CommandArgumentParser",
"get_arg_parsers",
"FalyxParsers",
"infer_args_from_func",
"same_argument_definitions",
]

View File

@ -1,4 +1,6 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
from __future__ import annotations
from copy import deepcopy
from dataclasses import dataclass
from enum import Enum
@ -23,12 +25,21 @@ class ArgumentAction(Enum):
COUNT = "count"
HELP = "help"
@classmethod
def choices(cls) -> list[ArgumentAction]:
"""Return a list of all argument actions."""
return list(cls)
def __str__(self) -> str:
"""Return the string representation of the argument action."""
return self.value
@dataclass
class Argument:
"""Represents a command-line argument."""
flags: list[str]
flags: tuple[str, ...]
dest: str # Destination name for the argument
action: ArgumentAction = (
ArgumentAction.STORE
@ -38,7 +49,7 @@ class Argument:
choices: list[str] | None = None # List of valid choices for the argument
required: bool = False # True if the argument is required
help: str = "" # Help text for the argument
nargs: int | str = 1 # int, '?', '*', '+'
nargs: int | str | None = None # int, '?', '*', '+', None
positional: bool = False # True if no leading - or -- in flags
def get_positional_text(self) -> str:
@ -66,7 +77,11 @@ class Argument:
and not self.positional
):
choice_text = self.dest.upper()
elif isinstance(self.nargs, str):
elif self.action in (
ArgumentAction.STORE,
ArgumentAction.APPEND,
ArgumentAction.EXTEND,
) or isinstance(self.nargs, str):
choice_text = self.dest
if self.nargs == "?":
@ -136,6 +151,7 @@ class CommandArgumentParser:
aliases: list[str] | None = None,
) -> None:
"""Initialize the CommandArgumentParser."""
self.console = Console(color_system="auto")
self.command_key: str = command_key
self.command_description: str = command_description
self.command_style: str = command_style
@ -148,7 +164,6 @@ class CommandArgumentParser:
self._flag_map: dict[str, Argument] = {}
self._dest_set: set[str] = set()
self._add_help()
self.console = Console(color_system="auto")
def _add_help(self):
"""Add help argument to the parser."""
@ -170,9 +185,7 @@ class CommandArgumentParser:
raise CommandArgumentError("Positional arguments cannot have multiple flags")
return positional
def _get_dest_from_flags(
self, flags: tuple[str, ...], dest: str | None
) -> str | None:
def _get_dest_from_flags(self, flags: tuple[str, ...], dest: str | None) -> str:
"""Convert flags to a destination name."""
if dest:
if not dest.replace("_", "").isalnum():
@ -201,7 +214,7 @@ class CommandArgumentParser:
return dest
def _determine_required(
self, required: bool, positional: bool, nargs: int | str
self, required: bool, positional: bool, nargs: int | str | None
) -> bool:
"""Determine if the argument is required."""
if required:
@ -219,7 +232,22 @@ class CommandArgumentParser:
return required
def _validate_nargs(self, nargs: int | str) -> int | str:
def _validate_nargs(
self, nargs: int | str | None, action: ArgumentAction
) -> int | str | None:
if action in (
ArgumentAction.STORE_FALSE,
ArgumentAction.STORE_TRUE,
ArgumentAction.COUNT,
ArgumentAction.HELP,
):
if nargs is not None:
raise CommandArgumentError(
f"nargs cannot be specified for {action} actions"
)
return None
if nargs is None:
nargs = 1
allowed_nargs = ("?", "*", "+")
if isinstance(nargs, int):
if nargs <= 0:
@ -231,7 +259,9 @@ class CommandArgumentParser:
raise CommandArgumentError(f"nargs must be an int or one of {allowed_nargs}")
return nargs
def _normalize_choices(self, choices: Iterable, expected_type: Any) -> list[Any]:
def _normalize_choices(
self, choices: Iterable | None, expected_type: Any
) -> list[Any]:
if choices is not None:
if isinstance(choices, dict):
raise CommandArgumentError("choices cannot be a dict")
@ -278,8 +308,34 @@ class CommandArgumentParser:
f"Default list value {default!r} for '{dest}' cannot be coerced to {expected_type.__name__}"
)
def _validate_action(
self, action: ArgumentAction | str, positional: bool
) -> ArgumentAction:
if not isinstance(action, ArgumentAction):
try:
action = ArgumentAction(action)
except ValueError:
raise CommandArgumentError(
f"Invalid action '{action}' is not a valid ArgumentAction"
)
if action in (
ArgumentAction.STORE_TRUE,
ArgumentAction.STORE_FALSE,
ArgumentAction.COUNT,
ArgumentAction.HELP,
):
if positional:
raise CommandArgumentError(
f"Action '{action}' cannot be used with positional arguments"
)
return action
def _resolve_default(
self, action: ArgumentAction, default: Any, nargs: str | int
self,
default: Any,
action: ArgumentAction,
nargs: str | int | None,
) -> Any:
"""Get the default value for the argument."""
if default is None:
@ -313,7 +369,18 @@ class CommandArgumentParser:
f"Flag '{flag}' must be a single character or start with '--'"
)
def add_argument(self, *flags, **kwargs):
def add_argument(
self,
*flags,
action: str | ArgumentAction = "store",
nargs: int | str | None = None,
default: Any = None,
type: Any = str,
choices: Iterable | None = None,
required: bool = False,
help: str = "",
dest: str | None = None,
) -> None:
"""Add an argument to the parser.
Args:
name or flags: Either a name or prefixed flags (e.g. 'faylx', '-f', '--falyx').
@ -326,9 +393,10 @@ class CommandArgumentParser:
help: A brief description of the argument.
dest: The name of the attribute to be added to the object returned by parse_args().
"""
expected_type = type
self._validate_flags(flags)
positional = self._is_positional(flags)
dest = self._get_dest_from_flags(flags, kwargs.get("dest"))
dest = self._get_dest_from_flags(flags, dest)
if dest in self._dest_set:
raise CommandArgumentError(
f"Destination '{dest}' is already defined.\n"
@ -336,18 +404,9 @@ class CommandArgumentParser:
"is not supported. Define a unique 'dest' for each argument."
)
self._dest_set.add(dest)
action = kwargs.get("action", ArgumentAction.STORE)
if not isinstance(action, ArgumentAction):
try:
action = ArgumentAction(action)
except ValueError:
raise CommandArgumentError(
f"Invalid action '{action}' is not a valid ArgumentAction"
)
flags = list(flags)
nargs = self._validate_nargs(kwargs.get("nargs", 1))
default = self._resolve_default(action, kwargs.get("default"), nargs)
expected_type = kwargs.get("type", str)
action = self._validate_action(action, positional)
nargs = self._validate_nargs(nargs, action)
default = self._resolve_default(default, action, nargs)
if (
action in (ArgumentAction.STORE, ArgumentAction.APPEND, ArgumentAction.EXTEND)
and default is not None
@ -356,14 +415,12 @@ class CommandArgumentParser:
self._validate_default_list_type(default, expected_type, dest)
else:
self._validate_default_type(default, expected_type, dest)
choices = self._normalize_choices(kwargs.get("choices"), expected_type)
choices = self._normalize_choices(choices, expected_type)
if default is not None and choices and default not in choices:
raise CommandArgumentError(
f"Default value '{default}' not in allowed choices: {choices}"
)
required = self._determine_required(
kwargs.get("required", False), positional, nargs
)
required = self._determine_required(required, positional, nargs)
argument = Argument(
flags=flags,
dest=dest,
@ -372,7 +429,7 @@ class CommandArgumentParser:
default=default,
choices=choices,
required=required,
help=kwargs.get("help", ""),
help=help,
nargs=nargs,
positional=positional,
)
@ -415,11 +472,11 @@ class CommandArgumentParser:
values = []
i = start
if isinstance(spec.nargs, int):
# assert i + spec.nargs <= len(
# args
# ), "Not enough arguments provided: shouldn't happen"
values = args[i : i + spec.nargs]
return values, i + spec.nargs
elif spec.nargs is None:
values = [args[i]]
return values, i + 1
elif spec.nargs == "+":
if i >= len(args):
raise CommandArgumentError(
@ -464,6 +521,8 @@ class CommandArgumentParser:
for next_spec in positional_args[j + 1 :]:
if isinstance(next_spec.nargs, int):
min_required += next_spec.nargs
elif next_spec.nargs is None:
min_required += 1
elif next_spec.nargs == "+":
min_required += 1
elif next_spec.nargs == "?":
@ -506,7 +565,7 @@ class CommandArgumentParser:
return i
def parse_args(
async def parse_args(
self, args: list[str] | None = None, from_validate: bool = False
) -> dict[str, Any]:
"""Parse Falyx Command arguments."""
@ -654,7 +713,7 @@ class CommandArgumentParser:
result.pop("help", None)
return result
def parse_args_split(
async def parse_args_split(
self, args: list[str], from_validate: bool = False
) -> tuple[tuple[Any, ...], dict[str, Any]]:
"""
@ -662,7 +721,7 @@ class CommandArgumentParser:
tuple[args, kwargs] - Positional arguments in defined order,
followed by keyword argument mapping.
"""
parsed = self.parse_args(args, from_validate)
parsed = await self.parse_args(args, from_validate)
args_list = []
kwargs_dict = {}
for arg in self._arguments:

View File

@ -114,7 +114,7 @@ def get_arg_parsers(
help="Skip confirmation prompts",
)
run_group.add_argument(
run_parser.add_argument(
"command_args",
nargs=REMAINDER,
help="Arguments to pass to the command (if applicable)",

View File

@ -1,17 +1,20 @@
import inspect
from typing import Any, Callable
from falyx import logger
from falyx.logger import logger
def infer_args_from_func(
func: Callable[[Any], Any],
func: Callable[[Any], Any] | None,
arg_metadata: dict[str, str | dict[str, Any]] | None = None,
) -> list[dict[str, Any]]:
"""
Infer argument definitions from a callable's signature.
Returns a list of kwargs suitable for CommandArgumentParser.add_argument.
"""
if not callable(func):
logger.debug("Provided argument is not callable: %s", func)
return []
arg_metadata = arg_metadata or {}
signature = inspect.signature(func)
arg_defs = []
@ -39,7 +42,7 @@ def infer_args_from_func(
else:
flags = [f"--{name.replace('_', '-')}"]
action = "store"
nargs: int | str = 1
nargs: int | str | None = None
if arg_type is bool:
if param.default is False:

View File

@ -1,7 +1,6 @@
from typing import Any
from falyx import logger
from falyx.action.action import Action, ChainedAction, ProcessAction
from falyx.parsers.signature import infer_args_from_func
@ -9,17 +8,13 @@ def same_argument_definitions(
actions: list[Any],
arg_metadata: dict[str, str | dict[str, Any]] | None = None,
) -> list[dict[str, Any]] | None:
from falyx.action.action import BaseAction
arg_sets = []
for action in actions:
if isinstance(action, (Action, ProcessAction)):
arg_defs = infer_args_from_func(action.action, arg_metadata)
elif isinstance(action, ChainedAction):
if action.actions:
action = action.actions[0]
if isinstance(action, Action):
arg_defs = infer_args_from_func(action.action, arg_metadata)
elif callable(action):
arg_defs = infer_args_from_func(action, arg_metadata)
if isinstance(action, BaseAction):
infer_target, _ = action.get_infer_target()
arg_defs = infer_args_from_func(infer_target, arg_metadata)
elif callable(action):
arg_defs = infer_args_from_func(action, arg_metadata)
else:

View File

@ -10,7 +10,7 @@ from rich.markup import escape
from rich.table import Table
from falyx.themes import OneColors
from falyx.utils import chunks
from falyx.utils import CaseInsensitiveDict, chunks
from falyx.validators import int_range_validator, key_validator
@ -32,6 +32,62 @@ class SelectionOption:
return f"[{OneColors.WHITE}]{key}[/] [{self.style}]{self.description}[/]"
class SelectionOptionMap(CaseInsensitiveDict):
"""
Manages selection options including validation and reserved key protection.
"""
RESERVED_KEYS: set[str] = set()
def __init__(
self,
options: dict[str, SelectionOption] | None = None,
allow_reserved: bool = False,
):
super().__init__()
self.allow_reserved = allow_reserved
if options:
self.update(options)
def _add_reserved(self, key: str, option: SelectionOption) -> None:
"""Add a reserved key, bypassing validation."""
norm_key = key.upper()
super().__setitem__(norm_key, option)
def __setitem__(self, key: str, option: SelectionOption) -> None:
if not isinstance(option, SelectionOption):
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
norm_key = key.upper()
if norm_key in self.RESERVED_KEYS and not self.allow_reserved:
raise ValueError(
f"Key '{key}' is reserved and cannot be used in SelectionOptionMap."
)
super().__setitem__(norm_key, option)
def __delitem__(self, key: str) -> None:
if key.upper() in self.RESERVED_KEYS and not self.allow_reserved:
raise ValueError(f"Cannot delete reserved option '{key}'.")
super().__delitem__(key)
def update(self, other=None, **kwargs):
"""Update the selection options with another dictionary."""
if other:
for key, option in other.items():
if not isinstance(option, SelectionOption):
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
self[key] = option
for key, option in kwargs.items():
if not isinstance(option, SelectionOption):
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
self[key] = option
def items(self, include_reserved: bool = True):
for k, v in super().items():
if not include_reserved and k in self.RESERVED_KEYS:
continue
yield k, v
def render_table_base(
title: str,
*,
@ -215,7 +271,7 @@ async def prompt_for_index(
prompt_session: PromptSession | None = None,
prompt_message: str = "Select an option > ",
show_table: bool = True,
):
) -> int:
prompt_session = prompt_session or PromptSession()
console = console or Console(color_system="auto")

View File

@ -1 +1 @@
__version__ = "0.1.29"
__version__ = "0.1.35"

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "falyx"
version = "0.1.29"
version = "0.1.35"
description = "Reliable and introspectable async CLI action framework."
authors = ["Roland Thomas Jr <roland@rtj.dev>"]
license = "MIT"

View File

@ -1,7 +1,7 @@
# test_command.py
import pytest
from falyx.action import Action, ActionGroup, BaseIOAction, ChainedAction
from falyx.action import Action, BaseIOAction, ChainedAction
from falyx.command import Command
from falyx.execution_registry import ExecutionRegistry as er
from falyx.retry import RetryPolicy
@ -56,102 +56,6 @@ def test_command_str():
)
@pytest.mark.parametrize(
"action_factory, expected_requires_input",
[
(lambda: Action(name="normal", action=dummy_action), False),
(lambda: DummyInputAction(name="io"), True),
(
lambda: ChainedAction(name="chain", actions=[DummyInputAction(name="io")]),
True,
),
(
lambda: ActionGroup(name="group", actions=[DummyInputAction(name="io")]),
True,
),
],
)
def test_command_requires_input_detection(action_factory, expected_requires_input):
action = action_factory()
cmd = Command(key="TEST", description="Test Command", action=action)
assert cmd.requires_input == expected_requires_input
if expected_requires_input:
assert cmd.hidden is True
else:
assert cmd.hidden is False
def test_requires_input_flag_detected_for_baseioaction():
"""Command should automatically detect requires_input=True for BaseIOAction."""
cmd = Command(
key="X",
description="Echo input",
action=DummyInputAction(name="dummy"),
)
assert cmd.requires_input is True
assert cmd.hidden is True
def test_requires_input_manual_override():
"""Command manually set requires_input=False should not auto-hide."""
cmd = Command(
key="Y",
description="Custom input command",
action=DummyInputAction(name="dummy"),
requires_input=False,
)
assert cmd.requires_input is False
assert cmd.hidden is False
def test_default_command_does_not_require_input():
"""Normal Command without IO Action should not require input."""
cmd = Command(
key="Z",
description="Simple action",
action=lambda: 42,
)
assert cmd.requires_input is False
assert cmd.hidden is False
def test_chain_requires_input():
"""If first action in a chain requires input, the command should require input."""
chain = ChainedAction(
name="ChainWithInput",
actions=[
DummyInputAction(name="dummy"),
Action(name="action1", action=lambda: 1),
],
)
cmd = Command(
key="A",
description="Chain with input",
action=chain,
)
assert cmd.requires_input is True
assert cmd.hidden is True
def test_group_requires_input():
"""If any action in a group requires input, the command should require input."""
group = ActionGroup(
name="GroupWithInput",
actions=[
Action(name="action1", action=lambda: 1),
DummyInputAction(name="dummy"),
],
)
cmd = Command(
key="B",
description="Group with input",
action=group,
)
assert cmd.requires_input is True
assert cmd.hidden is True
def test_enable_retry():
"""Command should enable retry if action is an Action and retry is set to True."""
cmd = Command(

View File

@ -5,98 +5,109 @@ from falyx.parsers import ArgumentAction, CommandArgumentParser
from falyx.signals import HelpSignal
def build_parser_and_parse(args, config):
async def build_parser_and_parse(args, config):
cap = CommandArgumentParser()
config(cap)
return cap.parse_args(args)
return await cap.parse_args(args)
def test_none():
@pytest.mark.asyncio
async def test_none():
def config(parser):
parser.add_argument("--foo", type=str)
parsed = build_parser_and_parse(None, config)
parsed = await build_parser_and_parse(None, config)
assert parsed["foo"] is None
def test_append_multiple_flags():
@pytest.mark.asyncio
async def test_append_multiple_flags():
def config(parser):
parser.add_argument("--tag", action=ArgumentAction.APPEND, type=str)
parsed = build_parser_and_parse(["--tag", "a", "--tag", "b", "--tag", "c"], config)
parsed = await build_parser_and_parse(
["--tag", "a", "--tag", "b", "--tag", "c"], config
)
assert parsed["tag"] == ["a", "b", "c"]
def test_positional_nargs_plus_and_single():
@pytest.mark.asyncio
async def test_positional_nargs_plus_and_single():
def config(parser):
parser.add_argument("files", nargs="+", type=str)
parser.add_argument("mode", nargs=1)
parsed = build_parser_and_parse(["a", "b", "c", "prod"], config)
parsed = await build_parser_and_parse(["a", "b", "c", "prod"], config)
assert parsed["files"] == ["a", "b", "c"]
assert parsed["mode"] == "prod"
def test_type_validation_failure():
@pytest.mark.asyncio
async def test_type_validation_failure():
def config(parser):
parser.add_argument("--count", type=int)
with pytest.raises(CommandArgumentError):
build_parser_and_parse(["--count", "abc"], config)
await build_parser_and_parse(["--count", "abc"], config)
def test_required_field_missing():
@pytest.mark.asyncio
async def test_required_field_missing():
def config(parser):
parser.add_argument("--env", type=str, required=True)
with pytest.raises(CommandArgumentError):
build_parser_and_parse([], config)
await build_parser_and_parse([], config)
def test_choices_enforced():
@pytest.mark.asyncio
async def test_choices_enforced():
def config(parser):
parser.add_argument("--mode", choices=["dev", "prod"])
with pytest.raises(CommandArgumentError):
build_parser_and_parse(["--mode", "staging"], config)
await build_parser_and_parse(["--mode", "staging"], config)
def test_boolean_flags():
@pytest.mark.asyncio
async def test_boolean_flags():
def config(parser):
parser.add_argument("--debug", action=ArgumentAction.STORE_TRUE)
parser.add_argument("--no-debug", action=ArgumentAction.STORE_FALSE)
parsed = build_parser_and_parse(["--debug", "--no-debug"], config)
parsed = await build_parser_and_parse(["--debug", "--no-debug"], config)
assert parsed["debug"] is True
assert parsed["no_debug"] is False
parsed = build_parser_and_parse([], config)
print(parsed)
parsed = await build_parser_and_parse([], config)
assert parsed["debug"] is False
assert parsed["no_debug"] is True
def test_count_action():
@pytest.mark.asyncio
async def test_count_action():
def config(parser):
parser.add_argument("-v", action=ArgumentAction.COUNT)
parsed = build_parser_and_parse(["-v", "-v", "-v"], config)
parsed = await build_parser_and_parse(["-v", "-v", "-v"], config)
assert parsed["v"] == 3
def test_nargs_star():
@pytest.mark.asyncio
async def test_nargs_star():
def config(parser):
parser.add_argument("args", nargs="*", type=str)
parsed = build_parser_and_parse(["one", "two", "three"], config)
parsed = await build_parser_and_parse(["one", "two", "three"], config)
assert parsed["args"] == ["one", "two", "three"]
def test_flag_and_positional_mix():
@pytest.mark.asyncio
async def test_flag_and_positional_mix():
def config(parser):
parser.add_argument("--env", type=str)
parser.add_argument("tasks", nargs="+")
parsed = build_parser_and_parse(["--env", "prod", "build", "test"], config)
parsed = await build_parser_and_parse(["--env", "prod", "build", "test"], config)
assert parsed["env"] == "prod"
assert parsed["tasks"] == ["build", "test"]
@ -134,7 +145,7 @@ def test_add_argument_multiple_optional_flags_same_dest():
parser.add_argument("-f", "--falyx")
arg = parser._arguments[-1]
assert arg.dest == "falyx"
assert arg.flags == ["-f", "--falyx"]
assert arg.flags == ("-f", "--falyx")
def test_add_argument_flag_dest_conflict():
@ -165,7 +176,7 @@ def test_add_argument_multiple_flags_custom_dest():
parser.add_argument("-f", "--falyx", "--test", dest="falyx")
arg = parser._arguments[-1]
assert arg.dest == "falyx"
assert arg.flags == ["-f", "--falyx", "--test"]
assert arg.flags == ("-f", "--falyx", "--test")
def test_add_argument_multiple_flags_dest():
@ -175,7 +186,7 @@ def test_add_argument_multiple_flags_dest():
parser.add_argument("-f", "--falyx", "--test")
arg = parser._arguments[-1]
assert arg.dest == "falyx"
assert arg.flags == ["-f", "--falyx", "--test"]
assert arg.flags == ("-f", "--falyx", "--test")
def test_add_argument_single_flag_dest():
@ -185,7 +196,7 @@ def test_add_argument_single_flag_dest():
parser.add_argument("-f")
arg = parser._arguments[-1]
assert arg.dest == "f"
assert arg.flags == ["-f"]
assert arg.flags == ("-f",)
def test_add_argument_bad_dest():
@ -257,7 +268,7 @@ def test_add_argument_default_value():
parser.add_argument("--falyx", default="default_value")
arg = parser._arguments[-1]
assert arg.dest == "falyx"
assert arg.flags == ["--falyx"]
assert arg.flags == ("--falyx",)
assert arg.default == "default_value"
@ -297,20 +308,21 @@ def test_add_argument_default_not_in_choices():
parser.add_argument("--falyx", choices=["a", "b"], default="c")
def test_add_argument_choices():
@pytest.mark.asyncio
async def test_add_argument_choices():
parser = CommandArgumentParser()
# ✅ Choices provided
parser.add_argument("--falyx", choices=["a", "b", "c"])
arg = parser._arguments[-1]
assert arg.dest == "falyx"
assert arg.flags == ["--falyx"]
assert arg.flags == ("--falyx",)
assert arg.choices == ["a", "b", "c"]
args = parser.parse_args(["--falyx", "a"])
args = await parser.parse_args(["--falyx", "a"])
assert args["falyx"] == "a"
with pytest.raises(CommandArgumentError):
parser.parse_args(["--falyx", "d"])
await parser.parse_args(["--falyx", "d"])
def test_add_argument_choices_invalid():
@ -352,7 +364,7 @@ def test_add_argument_nargs():
parser.add_argument("--falyx", nargs=2)
arg = parser._arguments[-1]
assert arg.dest == "falyx"
assert arg.flags == ["--falyx"]
assert arg.flags == ("--falyx",)
assert arg.nargs == 2
@ -377,56 +389,60 @@ def test_get_argument():
parser.add_argument("--falyx", type=str, default="default_value")
arg = parser.get_argument("falyx")
assert arg.dest == "falyx"
assert arg.flags == ["--falyx"]
assert arg.flags == ("--falyx",)
assert arg.default == "default_value"
def test_parse_args_nargs():
@pytest.mark.asyncio
async def test_parse_args_nargs():
parser = CommandArgumentParser()
parser.add_argument("files", nargs="+", type=str)
parser.add_argument("mode", nargs=1)
args = parser.parse_args(["a", "b", "c"])
args = await parser.parse_args(["a", "b", "c"])
assert args["files"] == ["a", "b"]
assert args["mode"] == "c"
def test_parse_args_nargs_plus():
@pytest.mark.asyncio
async def test_parse_args_nargs_plus():
parser = CommandArgumentParser()
parser.add_argument("files", nargs="+", type=str)
args = parser.parse_args(["a", "b", "c"])
args = await parser.parse_args(["a", "b", "c"])
assert args["files"] == ["a", "b", "c"]
args = parser.parse_args(["a"])
args = await parser.parse_args(["a"])
assert args["files"] == ["a"]
def test_parse_args_flagged_nargs_plus():
@pytest.mark.asyncio
async def test_parse_args_flagged_nargs_plus():
parser = CommandArgumentParser()
parser.add_argument("--files", nargs="+", type=str)
args = parser.parse_args(["--files", "a", "b", "c"])
args = await parser.parse_args(["--files", "a", "b", "c"])
assert args["files"] == ["a", "b", "c"]
args = parser.parse_args(["--files", "a"])
args = await parser.parse_args(["--files", "a"])
print(args)
assert args["files"] == ["a"]
args = parser.parse_args([])
args = await parser.parse_args([])
assert args["files"] == []
def test_parse_args_numbered_nargs():
@pytest.mark.asyncio
async def test_parse_args_numbered_nargs():
parser = CommandArgumentParser()
parser.add_argument("files", nargs=2, type=str)
args = parser.parse_args(["a", "b"])
args = await parser.parse_args(["a", "b"])
assert args["files"] == ["a", "b"]
with pytest.raises(CommandArgumentError):
args = parser.parse_args(["a"])
args = await parser.parse_args(["a"])
print(args)
@ -436,48 +452,53 @@ def test_parse_args_nargs_zero():
parser.add_argument("files", nargs=0, type=str)
def test_parse_args_nargs_more_than_expected():
@pytest.mark.asyncio
async def test_parse_args_nargs_more_than_expected():
parser = CommandArgumentParser()
parser.add_argument("files", nargs=2, type=str)
with pytest.raises(CommandArgumentError):
parser.parse_args(["a", "b", "c", "d"])
await parser.parse_args(["a", "b", "c", "d"])
def test_parse_args_nargs_one_or_none():
@pytest.mark.asyncio
async def test_parse_args_nargs_one_or_none():
parser = CommandArgumentParser()
parser.add_argument("files", nargs="?", type=str)
args = parser.parse_args(["a"])
args = await parser.parse_args(["a"])
assert args["files"] == "a"
args = parser.parse_args([])
args = await parser.parse_args([])
assert args["files"] is None
def test_parse_args_nargs_positional():
@pytest.mark.asyncio
async def test_parse_args_nargs_positional():
parser = CommandArgumentParser()
parser.add_argument("files", nargs="*", type=str)
args = parser.parse_args(["a", "b", "c"])
args = await parser.parse_args(["a", "b", "c"])
assert args["files"] == ["a", "b", "c"]
args = parser.parse_args([])
args = await parser.parse_args([])
assert args["files"] == []
def test_parse_args_nargs_positional_plus():
@pytest.mark.asyncio
async def test_parse_args_nargs_positional_plus():
parser = CommandArgumentParser()
parser.add_argument("files", nargs="+", type=str)
args = parser.parse_args(["a", "b", "c"])
args = await parser.parse_args(["a", "b", "c"])
assert args["files"] == ["a", "b", "c"]
with pytest.raises(CommandArgumentError):
args = parser.parse_args([])
args = await parser.parse_args([])
def test_parse_args_nargs_multiple_positional():
@pytest.mark.asyncio
async def test_parse_args_nargs_multiple_positional():
parser = CommandArgumentParser()
parser.add_argument("files", nargs="+", type=str)
parser.add_argument("mode", nargs=1)
@ -485,7 +506,7 @@ def test_parse_args_nargs_multiple_positional():
parser.add_argument("target", nargs="*")
parser.add_argument("extra", nargs="+")
args = parser.parse_args(["a", "b", "c", "d", "e"])
args = await parser.parse_args(["a", "b", "c", "d", "e"])
assert args["files"] == ["a", "b", "c"]
assert args["mode"] == "d"
assert args["action"] == []
@ -493,186 +514,209 @@ def test_parse_args_nargs_multiple_positional():
assert args["extra"] == ["e"]
with pytest.raises(CommandArgumentError):
parser.parse_args([])
await parser.parse_args([])
def test_parse_args_nargs_invalid_positional_arguments():
@pytest.mark.asyncio
async def test_parse_args_nargs_invalid_positional_arguments():
parser = CommandArgumentParser()
parser.add_argument("numbers", nargs="*", type=int)
parser.add_argument("mode", nargs=1)
with pytest.raises(CommandArgumentError):
parser.parse_args(["1", "2", "c", "d"])
await parser.parse_args(["1", "2", "c", "d"])
def test_parse_args_append():
@pytest.mark.asyncio
async def test_parse_args_append():
parser = CommandArgumentParser()
parser.add_argument("--numbers", action=ArgumentAction.APPEND, type=int)
args = parser.parse_args(["--numbers", "1", "--numbers", "2", "--numbers", "3"])
args = await parser.parse_args(["--numbers", "1", "--numbers", "2", "--numbers", "3"])
assert args["numbers"] == [1, 2, 3]
args = parser.parse_args(["--numbers", "1"])
args = await parser.parse_args(["--numbers", "1"])
assert args["numbers"] == [1]
args = parser.parse_args([])
args = await parser.parse_args([])
assert args["numbers"] == []
def test_parse_args_nargs_append():
@pytest.mark.asyncio
async def test_parse_args_nargs_append():
parser = CommandArgumentParser()
parser.add_argument("numbers", action=ArgumentAction.APPEND, type=int, nargs="*")
parser.add_argument("--mode")
args = parser.parse_args(["1", "2", "3", "--mode", "numbers", "4", "5"])
args = await parser.parse_args(["1", "2", "3", "--mode", "numbers", "4", "5"])
assert args["numbers"] == [[1, 2, 3], [4, 5]]
args = parser.parse_args(["1"])
args = await parser.parse_args(["1"])
assert args["numbers"] == [[1]]
args = parser.parse_args([])
args = await parser.parse_args([])
assert args["numbers"] == []
def test_parse_args_append_flagged_invalid_type():
@pytest.mark.asyncio
async def test_parse_args_append_flagged_invalid_type():
parser = CommandArgumentParser()
parser.add_argument("--numbers", action=ArgumentAction.APPEND, type=int)
with pytest.raises(CommandArgumentError):
parser.parse_args(["--numbers", "a"])
await parser.parse_args(["--numbers", "a"])
def test_append_groups_nargs():
@pytest.mark.asyncio
async def test_append_groups_nargs():
cap = CommandArgumentParser()
cap.add_argument("--item", action=ArgumentAction.APPEND, type=str, nargs=2)
parsed = cap.parse_args(["--item", "a", "b", "--item", "c", "d"])
parsed = await cap.parse_args(["--item", "a", "b", "--item", "c", "d"])
assert parsed["item"] == [["a", "b"], ["c", "d"]]
def test_extend_flattened():
@pytest.mark.asyncio
async def test_extend_flattened():
cap = CommandArgumentParser()
cap.add_argument("--value", action=ArgumentAction.EXTEND, type=str)
parsed = cap.parse_args(["--value", "x", "--value", "y"])
parsed = await cap.parse_args(["--value", "x", "--value", "y"])
assert parsed["value"] == ["x", "y"]
def test_parse_args_split_order():
@pytest.mark.asyncio
async def test_parse_args_split_order():
cap = CommandArgumentParser()
cap.add_argument("a")
cap.add_argument("--x")
cap.add_argument("b", nargs="*")
args, kwargs = cap.parse_args_split(["1", "--x", "100", "2"])
args, kwargs = await cap.parse_args_split(["1", "--x", "100", "2"])
assert args == ("1", ["2"])
assert kwargs == {"x": "100"}
def test_help_signal_triggers():
@pytest.mark.asyncio
async def test_help_signal_triggers():
parser = CommandArgumentParser()
parser.add_argument("--foo")
with pytest.raises(HelpSignal):
parser.parse_args(["--help"])
await parser.parse_args(["--help"])
def test_empty_parser_defaults():
@pytest.mark.asyncio
async def test_empty_parser_defaults():
parser = CommandArgumentParser()
with pytest.raises(HelpSignal):
parser.parse_args(["--help"])
await parser.parse_args(["--help"])
def test_extend_basic():
@pytest.mark.asyncio
async def test_extend_basic():
parser = CommandArgumentParser()
parser.add_argument("--tag", action=ArgumentAction.EXTEND, type=str)
args = parser.parse_args(["--tag", "a", "--tag", "b", "--tag", "c"])
args = await parser.parse_args(["--tag", "a", "--tag", "b", "--tag", "c"])
assert args["tag"] == ["a", "b", "c"]
def test_extend_nargs_2():
@pytest.mark.asyncio
async def test_extend_nargs_2():
parser = CommandArgumentParser()
parser.add_argument("--pair", action=ArgumentAction.EXTEND, type=str, nargs=2)
args = parser.parse_args(["--pair", "a", "b", "--pair", "c", "d"])
args = await parser.parse_args(["--pair", "a", "b", "--pair", "c", "d"])
assert args["pair"] == ["a", "b", "c", "d"]
def test_extend_nargs_star():
@pytest.mark.asyncio
async def test_extend_nargs_star():
parser = CommandArgumentParser()
parser.add_argument("--files", action=ArgumentAction.EXTEND, type=str, nargs="*")
args = parser.parse_args(["--files", "x", "y", "z"])
args = await parser.parse_args(["--files", "x", "y", "z"])
assert args["files"] == ["x", "y", "z"]
args = parser.parse_args(["--files"])
args = await parser.parse_args(["--files"])
assert args["files"] == []
def test_extend_nargs_plus():
@pytest.mark.asyncio
async def test_extend_nargs_plus():
parser = CommandArgumentParser()
parser.add_argument("--inputs", action=ArgumentAction.EXTEND, type=int, nargs="+")
args = parser.parse_args(["--inputs", "1", "2", "3", "--inputs", "4"])
args = await parser.parse_args(["--inputs", "1", "2", "3", "--inputs", "4"])
assert args["inputs"] == [1, 2, 3, 4]
def test_extend_invalid_type():
@pytest.mark.asyncio
async def test_extend_invalid_type():
parser = CommandArgumentParser()
parser.add_argument("--nums", action=ArgumentAction.EXTEND, type=int)
with pytest.raises(CommandArgumentError):
parser.parse_args(["--nums", "a"])
await parser.parse_args(["--nums", "a"])
def test_greedy_invalid_type():
@pytest.mark.asyncio
async def test_greedy_invalid_type():
parser = CommandArgumentParser()
parser.add_argument("--nums", nargs="*", type=int)
with pytest.raises(CommandArgumentError):
parser.parse_args(["--nums", "a"])
await parser.parse_args(["--nums", "a"])
def test_append_vs_extend_behavior():
@pytest.mark.asyncio
async def test_append_vs_extend_behavior():
parser = CommandArgumentParser()
parser.add_argument("--x", action=ArgumentAction.APPEND, nargs=2)
parser.add_argument("--y", action=ArgumentAction.EXTEND, nargs=2)
args = parser.parse_args(
args = await parser.parse_args(
["--x", "a", "b", "--x", "c", "d", "--y", "1", "2", "--y", "3", "4"]
)
assert args["x"] == [["a", "b"], ["c", "d"]]
assert args["y"] == ["1", "2", "3", "4"]
def test_append_vs_extend_behavior_error():
@pytest.mark.asyncio
async def test_append_vs_extend_behavior_error():
parser = CommandArgumentParser()
parser.add_argument("--x", action=ArgumentAction.APPEND, nargs=2)
parser.add_argument("--y", action=ArgumentAction.EXTEND, nargs=2)
# This should raise an error because the last argument is not a valid pair
with pytest.raises(CommandArgumentError):
parser.parse_args(["--x", "a", "b", "--x", "c", "d", "--y", "1", "2", "--y", "3"])
await parser.parse_args(
["--x", "a", "b", "--x", "c", "d", "--y", "1", "2", "--y", "3"]
)
with pytest.raises(CommandArgumentError):
parser.parse_args(["--x", "a", "b", "--x", "c", "--y", "1", "--y", "3", "4"])
await parser.parse_args(
["--x", "a", "b", "--x", "c", "--y", "1", "--y", "3", "4"]
)
def test_extend_positional():
@pytest.mark.asyncio
async def test_extend_positional():
parser = CommandArgumentParser()
parser.add_argument("files", action=ArgumentAction.EXTEND, type=str, nargs="*")
args = parser.parse_args(["a", "b", "c"])
args = await parser.parse_args(["a", "b", "c"])
assert args["files"] == ["a", "b", "c"]
args = parser.parse_args([])
args = await parser.parse_args([])
assert args["files"] == []
def test_extend_positional_nargs():
@pytest.mark.asyncio
async def test_extend_positional_nargs():
parser = CommandArgumentParser()
parser.add_argument("files", action=ArgumentAction.EXTEND, type=str, nargs="+")
args = parser.parse_args(["a", "b", "c"])
args = await parser.parse_args(["a", "b", "c"])
assert args["files"] == ["a", "b", "c"]
with pytest.raises(CommandArgumentError):
parser.parse_args([])
await parser.parse_args([])