7 Commits

34 changed files with 884 additions and 488 deletions

View File

@ -6,7 +6,7 @@ from falyx.action import ActionFactoryAction, ChainedAction, HTTPAction, Selecti
# Selection of a post ID to fetch (just an example set) # Selection of a post ID to fetch (just an example set)
post_selector = SelectionAction( post_selector = SelectionAction(
name="Pick Post ID", name="Pick Post ID",
selections=["1", "2", "3", "4", "5"], selections=["15", "25", "35", "45", "55"],
title="Choose a Post ID to submit", title="Choose a Post ID to submit",
prompt_message="Post ID > ", prompt_message="Post ID > ",
show_table=True, show_table=True,
@ -14,7 +14,7 @@ post_selector = SelectionAction(
# Factory that builds and executes the actual HTTP POST request # Factory that builds and executes the actual HTTP POST request
def build_post_action(post_id) -> HTTPAction: async def build_post_action(post_id) -> HTTPAction:
print(f"Building HTTPAction for Post ID: {post_id}") print(f"Building HTTPAction for Post ID: {post_id}")
return HTTPAction( return HTTPAction(
name=f"POST to /posts (id={post_id})", name=f"POST to /posts (id={post_id})",

View File

@ -24,7 +24,6 @@ cmd = Command(
key="G", key="G",
description="Greet someone with multiple variations.", description="Greet someone with multiple variations.",
action=group, action=group,
auto_args=True,
arg_metadata={ arg_metadata={
"name": { "name": {
"help": "The name of the person to greet.", "help": "The name of the person to greet.",

View File

@ -1,14 +1,18 @@
import asyncio import asyncio
from falyx import Action, Falyx from falyx import Action, ChainedAction, Falyx
from falyx.utils import setup_logging
setup_logging()
async def deploy(service: str, region: str = "us-east-1", verbose: bool = False): async def deploy(service: str, region: str = "us-east-1", verbose: bool = False) -> str:
if verbose: if verbose:
print(f"Deploying {service} to {region}...") print(f"Deploying {service} to {region}...")
await asyncio.sleep(2) await asyncio.sleep(2)
if verbose: if verbose:
print(f"{service} deployed successfully!") print(f"{service} deployed successfully!")
return f"{service} deployed to {region}"
flx = Falyx("Deployment CLI") flx = Falyx("Deployment CLI")
@ -21,7 +25,6 @@ flx.add_command(
name="deploy_service", name="deploy_service",
action=deploy, action=deploy,
), ),
auto_args=True,
arg_metadata={ arg_metadata={
"service": "Service name", "service": "Service name",
"region": {"help": "Deployment region", "choices": ["us-east-1", "us-west-2"]}, "region": {"help": "Deployment region", "choices": ["us-east-1", "us-west-2"]},
@ -29,4 +32,23 @@ flx.add_command(
}, },
) )
deploy_chain = ChainedAction(
name="DeployChain",
actions=[
Action(name="deploy_service", action=deploy),
Action(
name="notify",
action=lambda last_result: print(f"Notification: {last_result}"),
),
],
auto_inject=True,
)
flx.add_command(
key="N",
aliases=["notify"],
description="Deploy a service and notify.",
action=deploy_chain,
)
asyncio.run(flx.run()) asyncio.run(flx.run())

View File

@ -4,7 +4,6 @@ from rich.console import Console
from falyx import ActionGroup, Falyx from falyx import ActionGroup, Falyx
from falyx.action import HTTPAction from falyx.action import HTTPAction
from falyx.hook_manager import HookType
from falyx.hooks import ResultReporter from falyx.hooks import ResultReporter
console = Console() console = Console()
@ -49,7 +48,7 @@ action_group = ActionGroup(
reporter = ResultReporter() reporter = ResultReporter()
action_group.hooks.register( action_group.hooks.register(
HookType.ON_SUCCESS, "on_success",
reporter.report, reporter.report,
) )

View File

@ -2,8 +2,16 @@ import asyncio
import time import time
from falyx import Falyx from falyx import Falyx
from falyx.action import Action, ActionGroup, ChainedAction, MenuAction, ProcessAction from falyx.action import (
Action,
ActionGroup,
ChainedAction,
MenuAction,
ProcessAction,
PromptMenuAction,
)
from falyx.menu import MenuOption, MenuOptionMap from falyx.menu import MenuOption, MenuOptionMap
from falyx.themes import OneColors
# Basic coroutine for Action # Basic coroutine for Action
@ -77,20 +85,28 @@ parallel = ActionGroup(
process = ProcessAction(name="compute", action=heavy_computation) process = ProcessAction(name="compute", action=heavy_computation)
menu_options = MenuOptionMap(
{
"A": MenuOption("Run basic Action", basic_action, style=OneColors.LIGHT_YELLOW),
"C": MenuOption("Run ChainedAction", chained, style=OneColors.MAGENTA),
"P": MenuOption("Run ActionGroup (parallel)", parallel, style=OneColors.CYAN),
"H": MenuOption("Run ProcessAction (heavy task)", process, style=OneColors.GREEN),
}
)
# Menu setup # Menu setup
menu = MenuAction( menu = MenuAction(
name="main-menu", name="main-menu",
title="Choose a task to run", title="Choose a task to run",
menu_options=MenuOptionMap( menu_options=menu_options,
{ )
"1": MenuOption("Run basic Action", basic_action),
"2": MenuOption("Run ChainedAction", chained),
"3": MenuOption("Run ActionGroup (parallel)", parallel), prompt_menu = PromptMenuAction(
"4": MenuOption("Run ProcessAction (heavy task)", process), name="select-user",
} menu_options=menu_options,
),
) )
flx = Falyx( flx = Falyx(
@ -108,6 +124,13 @@ flx.add_command(
logging_hooks=True, logging_hooks=True,
) )
flx.add_command(
key="P",
description="Show Prompt Menu",
action=prompt_menu,
logging_hooks=True,
)
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(flx.run()) asyncio.run(flx.run())

View File

@ -3,7 +3,6 @@ import asyncio
from falyx import Action, ActionGroup, ChainedAction from falyx import Action, ActionGroup, ChainedAction
from falyx import ExecutionRegistry as er from falyx import ExecutionRegistry as er
from falyx import ProcessAction from falyx import ProcessAction
from falyx.hook_manager import HookType
from falyx.retry import RetryHandler, RetryPolicy from falyx.retry import RetryHandler, RetryPolicy
@ -47,7 +46,7 @@ def build_pipeline():
checkout = Action("Checkout", checkout_code) checkout = Action("Checkout", checkout_code)
analysis = ProcessAction("Static Analysis", run_static_analysis) analysis = ProcessAction("Static Analysis", run_static_analysis)
tests = Action("Run Tests", flaky_tests) tests = Action("Run Tests", flaky_tests)
tests.hooks.register(HookType.ON_ERROR, retry_handler.retry_on_error) tests.hooks.register("on_error", retry_handler.retry_on_error)
# Parallel deploys # Parallel deploys
deploy_group = ActionGroup( deploy_group = ActionGroup(

View File

@ -1,22 +1,30 @@
import asyncio import asyncio
from falyx.selection import ( from falyx.action import SelectionAction
SelectionOption, from falyx.selection import SelectionOption
prompt_for_selection, from falyx.signals import CancelSignal
render_selection_dict_table,
)
menu = { selections = {
"A": SelectionOption("Run diagnostics", lambda: print("Running diagnostics...")), "1": SelectionOption(
"B": SelectionOption("Deploy to staging", lambda: print("Deploying...")), description="Production", value="3bc2616e-3696-11f0-a139-089204eb86ac"
),
"2": SelectionOption(
description="Staging", value="42f2cd84-3696-11f0-a139-089204eb86ac"
),
} }
table = render_selection_dict_table(
title="Main Menu", select = SelectionAction(
selections=menu, name="Select Deployment",
selections=selections,
title="Select a Deployment",
columns=2,
prompt_message="> ",
return_type="value",
show_table=True,
) )
key = asyncio.run(prompt_for_selection(menu.keys(), table)) try:
print(f"You selected: {key}") print(asyncio.run(select()))
except CancelSignal:
menu[key.upper()].value() print("Selection was cancelled.")

View File

@ -3,7 +3,6 @@ import asyncio
from falyx import Action, ChainedAction, Falyx from falyx import Action, ChainedAction, Falyx
from falyx.action import ShellAction from falyx.action import ShellAction
from falyx.hook_manager import HookType
from falyx.hooks import ResultReporter from falyx.hooks import ResultReporter
from falyx.utils import setup_logging from falyx.utils import setup_logging
@ -42,12 +41,12 @@ reporter = ResultReporter()
a1 = Action("a1", a1, inject_last_result=True) a1 = Action("a1", a1, inject_last_result=True)
a1.hooks.register( a1.hooks.register(
HookType.ON_SUCCESS, "on_success",
reporter.report, reporter.report,
) )
a2 = Action("a2", a2, inject_last_result=True) a2 = Action("a2", a2, inject_last_result=True)
a2.hooks.register( a2.hooks.register(
HookType.ON_SUCCESS, "on_success",
reporter.report, reporter.report,
) )

View File

@ -12,7 +12,6 @@ from .command import Command
from .context import ExecutionContext, SharedContext from .context import ExecutionContext, SharedContext
from .execution_registry import ExecutionRegistry from .execution_registry import ExecutionRegistry
from .falyx import Falyx from .falyx import Falyx
from .hook_manager import HookType
logger = logging.getLogger("falyx") logger = logging.getLogger("falyx")

View File

@ -18,6 +18,7 @@ from .action_factory import ActionFactoryAction
from .http_action import HTTPAction from .http_action import HTTPAction
from .io_action import BaseIOAction, ShellAction from .io_action import BaseIOAction, ShellAction
from .menu_action import MenuAction from .menu_action import MenuAction
from .prompt_menu_action import PromptMenuAction
from .select_file_action import SelectFileAction from .select_file_action import SelectFileAction
from .selection_action import SelectionAction from .selection_action import SelectionAction
from .signal_action import SignalAction from .signal_action import SignalAction
@ -40,4 +41,5 @@ __all__ = [
"FallbackAction", "FallbackAction",
"LiteralInputAction", "LiteralInputAction",
"UserInputAction", "UserInputAction",
"PromptMenuAction",
] ]

View File

@ -47,6 +47,7 @@ from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import Hook, HookManager, HookType from falyx.hook_manager import Hook, HookManager, HookType
from falyx.logger import logger from falyx.logger import logger
from falyx.options_manager import OptionsManager from falyx.options_manager import OptionsManager
from falyx.parsers.utils import same_argument_definitions
from falyx.retry import RetryHandler, RetryPolicy from falyx.retry import RetryHandler, RetryPolicy
from falyx.themes import OneColors from falyx.themes import OneColors
from falyx.utils import ensure_async from falyx.utils import ensure_async
@ -62,7 +63,6 @@ class BaseAction(ABC):
into kwargs. into kwargs.
inject_into (str): The name of the kwarg key to inject the result as inject_into (str): The name of the kwarg key to inject the result as
(default: 'last_result'). (default: 'last_result').
_requires_injection (bool): Whether the action requires input injection.
""" """
def __init__( def __init__(
@ -82,7 +82,6 @@ class BaseAction(ABC):
self.inject_last_result: bool = inject_last_result self.inject_last_result: bool = inject_last_result
self.inject_into: str = inject_into self.inject_into: str = inject_into
self._never_prompt: bool = never_prompt self._never_prompt: bool = never_prompt
self._requires_injection: bool = False
self._skip_in_chain: bool = False self._skip_in_chain: bool = False
self.console = Console(color_system="auto") self.console = Console(color_system="auto")
self.options_manager: OptionsManager | None = None self.options_manager: OptionsManager | None = None
@ -101,6 +100,14 @@ class BaseAction(ABC):
async def preview(self, parent: Tree | None = None): async def preview(self, parent: Tree | None = None):
raise NotImplementedError("preview must be implemented by subclasses") raise NotImplementedError("preview must be implemented by subclasses")
@abstractmethod
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
"""
Returns the callable to be used for argument inference.
By default, it returns None.
"""
raise NotImplementedError("get_infer_target must be implemented by subclasses")
def set_options_manager(self, options_manager: OptionsManager) -> None: def set_options_manager(self, options_manager: OptionsManager) -> None:
self.options_manager = options_manager self.options_manager = options_manager
@ -154,10 +161,6 @@ class BaseAction(ABC):
async def _write_stdout(self, data: str) -> None: async def _write_stdout(self, data: str) -> None:
"""Override in subclasses that produce terminal output.""" """Override in subclasses that produce terminal output."""
def requires_io_injection(self) -> bool:
"""Checks to see if the action requires input injection."""
return self._requires_injection
def __repr__(self) -> str: def __repr__(self) -> str:
return str(self) return str(self)
@ -246,6 +249,13 @@ class Action(BaseAction):
if policy.enabled: if policy.enabled:
self.enable_retry() self.enable_retry()
def get_infer_target(self) -> tuple[Callable[..., Any], None]:
"""
Returns the callable to be used for argument inference.
By default, it returns the action itself.
"""
return self.action, None
async def _run(self, *args, **kwargs) -> Any: async def _run(self, *args, **kwargs) -> Any:
combined_args = args + self.args combined_args = args + self.args
combined_kwargs = self._maybe_inject_last_result({**self.kwargs, **kwargs}) combined_kwargs = self._maybe_inject_last_result({**self.kwargs, **kwargs})
@ -477,6 +487,14 @@ class ChainedAction(BaseAction, ActionListMixin):
if hasattr(action, "register_teardown") and callable(action.register_teardown): if hasattr(action, "register_teardown") and callable(action.register_teardown):
action.register_teardown(self.hooks) action.register_teardown(self.hooks)
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
if self.actions:
return self.actions[0].get_infer_target()
return None, None
def _clear_args(self):
return (), {}
async def _run(self, *args, **kwargs) -> list[Any]: async def _run(self, *args, **kwargs) -> list[Any]:
if not self.actions: if not self.actions:
raise EmptyChainError(f"[{self.name}] No actions to execute.") raise EmptyChainError(f"[{self.name}] No actions to execute.")
@ -505,11 +523,7 @@ class ChainedAction(BaseAction, ActionListMixin):
continue continue
shared_context.current_index = index shared_context.current_index = index
prepared = action.prepare(shared_context, self.options_manager) prepared = action.prepare(shared_context, self.options_manager)
last_result = shared_context.last_result()
try: try:
if self.requires_io_injection() and last_result is not None:
result = await prepared(**{prepared.inject_into: last_result})
else:
result = await prepared(*args, **updated_kwargs) result = await prepared(*args, **updated_kwargs)
except Exception as error: except Exception as error:
if index + 1 < len(self.actions) and isinstance( if index + 1 < len(self.actions) and isinstance(
@ -529,6 +543,7 @@ class ChainedAction(BaseAction, ActionListMixin):
fallback._skip_in_chain = True fallback._skip_in_chain = True
else: else:
raise raise
args, updated_kwargs = self._clear_args()
shared_context.add_result(result) shared_context.add_result(result)
context.extra["results"].append(result) context.extra["results"].append(result)
context.extra["rollback_stack"].append(prepared) context.extra["rollback_stack"].append(prepared)
@ -669,6 +684,16 @@ class ActionGroup(BaseAction, ActionListMixin):
if hasattr(action, "register_teardown") and callable(action.register_teardown): if hasattr(action, "register_teardown") and callable(action.register_teardown):
action.register_teardown(self.hooks) action.register_teardown(self.hooks)
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
arg_defs = same_argument_definitions(self.actions)
if arg_defs:
return self.actions[0].get_infer_target()
logger.debug(
"[%s] auto_args disabled: mismatched ActionGroup arguments",
self.name,
)
return None, None
async def _run(self, *args, **kwargs) -> list[tuple[str, Any]]: async def _run(self, *args, **kwargs) -> list[tuple[str, Any]]:
shared_context = SharedContext(name=self.name, action=self, is_parallel=True) shared_context = SharedContext(name=self.name, action=self, is_parallel=True)
if self.shared_context: if self.shared_context:
@ -701,7 +726,7 @@ class ActionGroup(BaseAction, ActionListMixin):
if context.extra["errors"]: if context.extra["errors"]:
context.exception = Exception( context.exception = Exception(
f"{len(context.extra['errors'])} action(s) failed: " f"{len(context.extra['errors'])} action(s) failed: "
f"{' ,'.join(name for name, _ in context.extra["errors"])}" f"{' ,'.join(name for name, _ in context.extra['errors'])}"
) )
await self.hooks.trigger(HookType.ON_ERROR, context) await self.hooks.trigger(HookType.ON_ERROR, context)
raise context.exception raise context.exception
@ -787,8 +812,11 @@ class ProcessAction(BaseAction):
self.executor = executor or ProcessPoolExecutor() self.executor = executor or ProcessPoolExecutor()
self.is_retryable = True self.is_retryable = True
async def _run(self, *args, **kwargs): def get_infer_target(self) -> tuple[Callable[..., Any] | None, None]:
if self.inject_last_result: return self.action, None
async def _run(self, *args, **kwargs) -> Any:
if self.inject_last_result and self.shared_context:
last_result = self.shared_context.last_result() last_result = self.shared_context.last_result()
if not self._validate_pickleable(last_result): if not self._validate_pickleable(last_result):
raise ValueError( raise ValueError(

View File

@ -1,6 +1,6 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed # Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""action_factory.py""" """action_factory.py"""
from typing import Any from typing import Any, Callable
from rich.tree import Tree from rich.tree import Tree
@ -35,6 +35,8 @@ class ActionFactoryAction(BaseAction):
*, *,
inject_last_result: bool = False, inject_last_result: bool = False,
inject_into: str = "last_result", inject_into: str = "last_result",
args: tuple[Any, ...] = (),
kwargs: dict[str, Any] | None = None,
preview_args: tuple[Any, ...] = (), preview_args: tuple[Any, ...] = (),
preview_kwargs: dict[str, Any] | None = None, preview_kwargs: dict[str, Any] | None = None,
): ):
@ -44,6 +46,8 @@ class ActionFactoryAction(BaseAction):
inject_into=inject_into, inject_into=inject_into,
) )
self.factory = factory self.factory = factory
self.args = args
self.kwargs = kwargs or {}
self.preview_args = preview_args self.preview_args = preview_args
self.preview_kwargs = preview_kwargs or {} self.preview_kwargs = preview_kwargs or {}
@ -55,7 +59,12 @@ class ActionFactoryAction(BaseAction):
def factory(self, value: ActionFactoryProtocol): def factory(self, value: ActionFactoryProtocol):
self._factory = ensure_async(value) self._factory = ensure_async(value)
def get_infer_target(self) -> tuple[Callable[..., Any], None]:
return self.factory, None
async def _run(self, *args, **kwargs) -> Any: async def _run(self, *args, **kwargs) -> Any:
args = (*self.args, *args)
kwargs = {**self.kwargs, **kwargs}
updated_kwargs = self._maybe_inject_last_result(kwargs) updated_kwargs = self._maybe_inject_last_result(kwargs)
context = ExecutionContext( context = ExecutionContext(
name=f"{self.name} (factory)", name=f"{self.name} (factory)",
@ -85,7 +94,7 @@ class ActionFactoryAction(BaseAction):
) )
if self.options_manager: if self.options_manager:
generated_action.set_options_manager(self.options_manager) generated_action.set_options_manager(self.options_manager)
context.result = await generated_action(*args, **kwargs) context.result = await generated_action()
await self.hooks.trigger(HookType.ON_SUCCESS, context) await self.hooks.trigger(HookType.ON_SUCCESS, context)
return context.result return context.result
except Exception as error: except Exception as error:

View File

@ -19,7 +19,7 @@ import asyncio
import shlex import shlex
import subprocess import subprocess
import sys import sys
from typing import Any from typing import Any, Callable
from rich.tree import Tree from rich.tree import Tree
@ -73,7 +73,6 @@ class BaseIOAction(BaseAction):
inject_last_result=inject_last_result, inject_last_result=inject_last_result,
) )
self.mode = mode self.mode = mode
self._requires_injection = True
def from_input(self, raw: str | bytes) -> Any: def from_input(self, raw: str | bytes) -> Any:
raise NotImplementedError raise NotImplementedError
@ -81,15 +80,15 @@ class BaseIOAction(BaseAction):
def to_output(self, result: Any) -> str | bytes: def to_output(self, result: Any) -> str | bytes:
raise NotImplementedError raise NotImplementedError
async def _resolve_input(self, kwargs: dict[str, Any]) -> str | bytes: async def _resolve_input(
last_result = kwargs.pop(self.inject_into, None) self, args: tuple[Any], kwargs: dict[str, Any]
) -> str | bytes:
data = await self._read_stdin() data = await self._read_stdin()
if data: if data:
return self.from_input(data) return self.from_input(data)
if last_result is not None: if len(args) == 1:
return last_result return self.from_input(args[0])
if self.inject_last_result and self.shared_context: if self.inject_last_result and self.shared_context:
return self.shared_context.last_result() return self.shared_context.last_result()
@ -99,6 +98,9 @@ class BaseIOAction(BaseAction):
) )
raise FalyxError("No input provided and no last result to inject.") raise FalyxError("No input provided and no last result to inject.")
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
return None, None
async def __call__(self, *args, **kwargs): async def __call__(self, *args, **kwargs):
context = ExecutionContext( context = ExecutionContext(
name=self.name, name=self.name,
@ -117,8 +119,8 @@ class BaseIOAction(BaseAction):
pass pass
result = getattr(self, "_last_result", None) result = getattr(self, "_last_result", None)
else: else:
parsed_input = await self._resolve_input(kwargs) parsed_input = await self._resolve_input(args, kwargs)
result = await self._run(parsed_input, *args, **kwargs) result = await self._run(parsed_input)
output = self.to_output(result) output = self.to_output(result)
await self._write_stdout(output) await self._write_stdout(output)
context.result = result context.result = result
@ -195,7 +197,6 @@ class ShellAction(BaseIOAction):
- Captures stdout and stderr from shell execution - Captures stdout and stderr from shell execution
- Raises on non-zero exit codes with stderr as the error - Raises on non-zero exit codes with stderr as the error
- Result is returned as trimmed stdout string - Result is returned as trimmed stdout string
- Compatible with ChainedAction and Command.requires_input detection
Args: Args:
name (str): Name of the action. name (str): Name of the action.
@ -220,6 +221,11 @@ class ShellAction(BaseIOAction):
) )
return raw.strip() if isinstance(raw, str) else raw.decode("utf-8").strip() return raw.strip() if isinstance(raw, str) else raw.decode("utf-8").strip()
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
if sys.stdin.isatty():
return self._run, {"parsed_input": {"help": self.command_template}}
return None, None
async def _run(self, parsed_input: str) -> str: async def _run(self, parsed_input: str) -> str:
# Replace placeholder in template, or use raw input as full command # Replace placeholder in template, or use raw input as full command
command = self.command_template.format(parsed_input) command = self.command_template.format(parsed_input)

View File

@ -73,6 +73,9 @@ class MenuAction(BaseAction):
table.add_row(*row) table.add_row(*row)
return table return table
def get_infer_target(self) -> tuple[None, None]:
return None, None
async def _run(self, *args, **kwargs) -> Any: async def _run(self, *args, **kwargs) -> Any:
kwargs = self._maybe_inject_last_result(kwargs) kwargs = self._maybe_inject_last_result(kwargs)
context = ExecutionContext( context = ExecutionContext(

View File

@ -0,0 +1,134 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""prompt_menu_action.py"""
from typing import Any
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import FormattedText, merge_formatted_text
from rich.console import Console
from rich.tree import Tree
from falyx.action.action import BaseAction
from falyx.context import ExecutionContext
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookType
from falyx.logger import logger
from falyx.menu import MenuOptionMap
from falyx.signals import BackSignal, QuitSignal
from falyx.themes import OneColors
class PromptMenuAction(BaseAction):
"""PromptMenuAction class for creating prompt -> actions."""
def __init__(
self,
name: str,
menu_options: MenuOptionMap,
*,
prompt_message: str = "Select > ",
default_selection: str = "",
inject_last_result: bool = False,
inject_into: str = "last_result",
console: Console | None = None,
prompt_session: PromptSession | None = None,
never_prompt: bool = False,
include_reserved: bool = True,
):
super().__init__(
name,
inject_last_result=inject_last_result,
inject_into=inject_into,
never_prompt=never_prompt,
)
self.menu_options = menu_options
self.prompt_message = prompt_message
self.default_selection = default_selection
self.console = console or Console(color_system="auto")
self.prompt_session = prompt_session or PromptSession()
self.include_reserved = include_reserved
def get_infer_target(self) -> tuple[None, None]:
return None, None
async def _run(self, *args, **kwargs) -> Any:
kwargs = self._maybe_inject_last_result(kwargs)
context = ExecutionContext(
name=self.name,
args=args,
kwargs=kwargs,
action=self,
)
effective_default = self.default_selection
maybe_result = str(self.last_result)
if maybe_result in self.menu_options:
effective_default = maybe_result
elif self.inject_last_result:
logger.warning(
"[%s] Injected last result '%s' not found in menu options",
self.name,
maybe_result,
)
if self.never_prompt and not effective_default:
raise ValueError(
f"[{self.name}] 'never_prompt' is True but no valid default_selection"
" was provided."
)
context.start_timer()
try:
await self.hooks.trigger(HookType.BEFORE, context)
key = effective_default
if not self.never_prompt:
placeholder_formatted_text = []
for index, (key, option) in enumerate(self.menu_options.items()):
placeholder_formatted_text.append(option.render_prompt(key))
if index < len(self.menu_options) - 1:
placeholder_formatted_text.append(
FormattedText([(OneColors.WHITE, " | ")])
)
placeholder = merge_formatted_text(placeholder_formatted_text)
key = await self.prompt_session.prompt_async(
message=self.prompt_message, placeholder=placeholder
)
option = self.menu_options[key]
result = await option.action(*args, **kwargs)
context.result = result
await self.hooks.trigger(HookType.ON_SUCCESS, context)
return result
except BackSignal:
logger.debug("[%s][BackSignal] ← Returning to previous menu", self.name)
return None
except QuitSignal:
logger.debug("[%s][QuitSignal] ← Exiting application", self.name)
raise
except Exception as error:
context.exception = error
await self.hooks.trigger(HookType.ON_ERROR, context)
raise
finally:
context.stop_timer()
await self.hooks.trigger(HookType.AFTER, context)
await self.hooks.trigger(HookType.ON_TEARDOWN, context)
er.record(context)
async def preview(self, parent: Tree | None = None):
label = f"[{OneColors.LIGHT_YELLOW_b}]📋 PromptMenuAction[/] '{self.name}'"
tree = parent.add(label) if parent else Tree(label)
for key, option in self.menu_options.items():
tree.add(
f"[dim]{key}[/]: {option.description} → [italic]{option.action.name}[/]"
)
await option.action.preview(parent=tree)
if not parent:
self.console.print(tree)
def __str__(self) -> str:
return (
f"PromptMenuAction(name={self.name!r}, options={list(self.menu_options.keys())!r}, "
f"default_selection={self.default_selection!r}, "
f"include_reserved={self.include_reserved}, "
f"prompt={'off' if self.never_prompt else 'on'})"
)

View File

@ -25,6 +25,7 @@ from falyx.selection import (
prompt_for_selection, prompt_for_selection,
render_selection_dict_table, render_selection_dict_table,
) )
from falyx.signals import CancelSignal
from falyx.themes import OneColors from falyx.themes import OneColors
@ -121,6 +122,16 @@ class SelectFileAction(BaseAction):
logger.warning("[ERROR] Failed to parse %s: %s", file.name, error) logger.warning("[ERROR] Failed to parse %s: %s", file.name, error)
return options return options
def _find_cancel_key(self, options) -> str:
"""Return first numeric value not already used in the selection dict."""
for index in range(len(options)):
if str(index) not in options:
return str(index)
return str(len(options))
def get_infer_target(self) -> tuple[None, None]:
return None, None
async def _run(self, *args, **kwargs) -> Any: async def _run(self, *args, **kwargs) -> Any:
context = ExecutionContext(name=self.name, args=args, kwargs=kwargs, action=self) context = ExecutionContext(name=self.name, args=args, kwargs=kwargs, action=self)
context.start_timer() context.start_timer()
@ -128,28 +139,38 @@ class SelectFileAction(BaseAction):
await self.hooks.trigger(HookType.BEFORE, context) await self.hooks.trigger(HookType.BEFORE, context)
files = [ files = [
f file
for f in self.directory.iterdir() for file in self.directory.iterdir()
if f.is_file() if file.is_file()
and (self.suffix_filter is None or f.suffix == self.suffix_filter) and (self.suffix_filter is None or file.suffix == self.suffix_filter)
] ]
if not files: if not files:
raise FileNotFoundError("No files found in directory.") raise FileNotFoundError("No files found in directory.")
options = self.get_options(files) options = self.get_options(files)
cancel_key = self._find_cancel_key(options)
cancel_option = {
cancel_key: SelectionOption(
description="Cancel", value=CancelSignal(), style=OneColors.DARK_RED
)
}
table = render_selection_dict_table( table = render_selection_dict_table(
title=self.title, selections=options, columns=self.columns title=self.title, selections=options | cancel_option, columns=self.columns
) )
key = await prompt_for_selection( key = await prompt_for_selection(
options.keys(), (options | cancel_option).keys(),
table, table,
console=self.console, console=self.console,
prompt_session=self.prompt_session, prompt_session=self.prompt_session,
prompt_message=self.prompt_message, prompt_message=self.prompt_message,
) )
if key == cancel_key:
raise CancelSignal("User canceled the selection.")
result = options[key].value result = options[key].value
context.result = result context.result = result
await self.hooks.trigger(HookType.ON_SUCCESS, context) await self.hooks.trigger(HookType.ON_SUCCESS, context)
@ -176,11 +197,11 @@ class SelectFileAction(BaseAction):
try: try:
files = list(self.directory.iterdir()) files = list(self.directory.iterdir())
if self.suffix_filter: if self.suffix_filter:
files = [f for f in files if f.suffix == self.suffix_filter] files = [file for file in files if file.suffix == self.suffix_filter]
sample = files[:10] sample = files[:10]
file_list = tree.add("[dim]Files:[/]") file_list = tree.add("[dim]Files:[/]")
for f in sample: for file in sample:
file_list.add(f"[dim]{f.name}[/]") file_list.add(f"[dim]{file.name}[/]")
if len(files) > 10: if len(files) > 10:
file_list.add(f"[dim]... ({len(files) - 10} more)[/]") file_list.add(f"[dim]... ({len(files) - 10} more)[/]")
except Exception as error: except Exception as error:

View File

@ -1,5 +1,6 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed # Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""selection_action.py""" """selection_action.py"""
from copy import copy
from typing import Any from typing import Any
from prompt_toolkit import PromptSession from prompt_toolkit import PromptSession
@ -7,19 +8,21 @@ from rich.console import Console
from rich.tree import Tree from rich.tree import Tree
from falyx.action.action import BaseAction from falyx.action.action import BaseAction
from falyx.action.types import SelectionReturnType
from falyx.context import ExecutionContext from falyx.context import ExecutionContext
from falyx.execution_registry import ExecutionRegistry as er from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookType from falyx.hook_manager import HookType
from falyx.logger import logger from falyx.logger import logger
from falyx.selection import ( from falyx.selection import (
SelectionOption, SelectionOption,
SelectionOptionMap,
prompt_for_index, prompt_for_index,
prompt_for_selection, prompt_for_selection,
render_selection_dict_table, render_selection_dict_table,
render_selection_indexed_table, render_selection_indexed_table,
) )
from falyx.signals import CancelSignal
from falyx.themes import OneColors from falyx.themes import OneColors
from falyx.utils import CaseInsensitiveDict
class SelectionAction(BaseAction): class SelectionAction(BaseAction):
@ -34,7 +37,13 @@ class SelectionAction(BaseAction):
def __init__( def __init__(
self, self,
name: str, name: str,
selections: list[str] | set[str] | tuple[str, ...] | dict[str, SelectionOption], selections: (
list[str]
| set[str]
| tuple[str, ...]
| dict[str, SelectionOption]
| dict[str, Any]
),
*, *,
title: str = "Select an option", title: str = "Select an option",
columns: int = 5, columns: int = 5,
@ -42,7 +51,7 @@ class SelectionAction(BaseAction):
default_selection: str = "", default_selection: str = "",
inject_last_result: bool = False, inject_last_result: bool = False,
inject_into: str = "last_result", inject_into: str = "last_result",
return_key: bool = False, return_type: SelectionReturnType | str = "value",
console: Console | None = None, console: Console | None = None,
prompt_session: PromptSession | None = None, prompt_session: PromptSession | None = None,
never_prompt: bool = False, never_prompt: bool = False,
@ -55,8 +64,8 @@ class SelectionAction(BaseAction):
never_prompt=never_prompt, never_prompt=never_prompt,
) )
# Setter normalizes to correct type, mypy can't infer that # Setter normalizes to correct type, mypy can't infer that
self.selections: list[str] | CaseInsensitiveDict = selections # type: ignore[assignment] self.selections: list[str] | SelectionOptionMap = selections # type: ignore[assignment]
self.return_key = return_key self.return_type: SelectionReturnType = self._coerce_return_type(return_type)
self.title = title self.title = title
self.columns = columns self.columns = columns
self.console = console or Console(color_system="auto") self.console = console or Console(color_system="auto")
@ -64,9 +73,17 @@ class SelectionAction(BaseAction):
self.default_selection = default_selection self.default_selection = default_selection
self.prompt_message = prompt_message self.prompt_message = prompt_message
self.show_table = show_table self.show_table = show_table
self.cancel_key = self._find_cancel_key()
def _coerce_return_type(
self, return_type: SelectionReturnType | str
) -> SelectionReturnType:
if isinstance(return_type, SelectionReturnType):
return return_type
return SelectionReturnType(return_type)
@property @property
def selections(self) -> list[str] | CaseInsensitiveDict: def selections(self) -> list[str] | SelectionOptionMap:
return self._selections return self._selections
@selections.setter @selections.setter
@ -74,17 +91,69 @@ class SelectionAction(BaseAction):
self, value: list[str] | set[str] | tuple[str, ...] | dict[str, SelectionOption] self, value: list[str] | set[str] | tuple[str, ...] | dict[str, SelectionOption]
): ):
if isinstance(value, (list, tuple, set)): if isinstance(value, (list, tuple, set)):
self._selections: list[str] | CaseInsensitiveDict = list(value) self._selections: list[str] | SelectionOptionMap = list(value)
elif isinstance(value, dict): elif isinstance(value, dict):
cid = CaseInsensitiveDict() som = SelectionOptionMap()
cid.update(value) if all(isinstance(key, str) for key in value) and all(
self._selections = cid not isinstance(value[key], SelectionOption) for key in value
):
som.update(
{
str(index): SelectionOption(key, option)
for index, (key, option) in enumerate(value.items())
}
)
elif all(isinstance(key, str) for key in value) and all(
isinstance(value[key], SelectionOption) for key in value
):
som.update(value)
else:
raise ValueError("Invalid dictionary format. Keys must be strings")
self._selections = som
else: else:
raise TypeError( raise TypeError(
"'selections' must be a list[str] or dict[str, SelectionOption], " "'selections' must be a list[str] or dict[str, SelectionOption], "
f"got {type(value).__name__}" f"got {type(value).__name__}"
) )
def _find_cancel_key(self) -> str:
"""Find the cancel key in the selections."""
if isinstance(self.selections, dict):
for index in range(len(self.selections) + 1):
if str(index) not in self.selections:
return str(index)
return str(len(self.selections))
@property
def cancel_key(self) -> str:
return self._cancel_key
@cancel_key.setter
def cancel_key(self, value: str) -> None:
"""Set the cancel key for the selection."""
if not isinstance(value, str):
raise TypeError("Cancel key must be a string.")
if isinstance(self.selections, dict) and value in self.selections:
raise ValueError(
"Cancel key cannot be one of the selection keys. "
f"Current selections: {self.selections}"
)
if isinstance(self.selections, list):
if not value.isdigit() or int(value) > len(self.selections):
raise ValueError(
"cancel_key must be a digit and not greater than the number of selections."
)
self._cancel_key = value
def cancel_formatter(self, index: int, selection: str) -> str:
"""Format the cancel option for display."""
if self.cancel_key == str(index):
return f"[{index}] [{OneColors.DARK_RED}]Cancel[/]"
return f"[{index}] {selection}"
def get_infer_target(self) -> tuple[None, None]:
return None, None
async def _run(self, *args, **kwargs) -> Any: async def _run(self, *args, **kwargs) -> Any:
kwargs = self._maybe_inject_last_result(kwargs) kwargs = self._maybe_inject_last_result(kwargs)
context = ExecutionContext( context = ExecutionContext(
@ -125,16 +194,18 @@ class SelectionAction(BaseAction):
context.start_timer() context.start_timer()
try: try:
self.cancel_key = self._find_cancel_key()
await self.hooks.trigger(HookType.BEFORE, context) await self.hooks.trigger(HookType.BEFORE, context)
if isinstance(self.selections, list): if isinstance(self.selections, list):
table = render_selection_indexed_table( table = render_selection_indexed_table(
title=self.title, title=self.title,
selections=self.selections, selections=self.selections + ["Cancel"],
columns=self.columns, columns=self.columns,
formatter=self.cancel_formatter,
) )
if not self.never_prompt: if not self.never_prompt:
index = await prompt_for_index( index: int | str = await prompt_for_index(
len(self.selections) - 1, len(self.selections),
table, table,
default_selection=effective_default, default_selection=effective_default,
console=self.console, console=self.console,
@ -144,14 +215,23 @@ class SelectionAction(BaseAction):
) )
else: else:
index = effective_default index = effective_default
result = self.selections[int(index)] if int(index) == int(self.cancel_key):
raise CancelSignal("User cancelled the selection.")
result: Any = self.selections[int(index)]
elif isinstance(self.selections, dict): elif isinstance(self.selections, dict):
cancel_option = {
self.cancel_key: SelectionOption(
description="Cancel", value=CancelSignal, style=OneColors.DARK_RED
)
}
table = render_selection_dict_table( table = render_selection_dict_table(
title=self.title, selections=self.selections, columns=self.columns title=self.title,
selections=self.selections | cancel_option,
columns=self.columns,
) )
if not self.never_prompt: if not self.never_prompt:
key = await prompt_for_selection( key = await prompt_for_selection(
self.selections.keys(), (self.selections | cancel_option).keys(),
table, table,
default_selection=effective_default, default_selection=effective_default,
console=self.console, console=self.console,
@ -161,10 +241,25 @@ class SelectionAction(BaseAction):
) )
else: else:
key = effective_default key = effective_default
result = key if self.return_key else self.selections[key].value if key == self.cancel_key:
raise CancelSignal("User cancelled the selection.")
if self.return_type == SelectionReturnType.KEY:
result = key
elif self.return_type == SelectionReturnType.VALUE:
result = self.selections[key].value
elif self.return_type == SelectionReturnType.ITEMS:
result = {key: self.selections[key]}
elif self.return_type == SelectionReturnType.DESCRIPTION:
result = self.selections[key].description
elif self.return_type == SelectionReturnType.DESCRIPTION_VALUE:
result = {
self.selections[key].description: self.selections[key].value
}
else:
raise ValueError(f"Unsupported return type: {self.return_type}")
else: else:
raise TypeError( raise TypeError(
"'selections' must be a list[str] or dict[str, tuple[str, Any]], " "'selections' must be a list[str] or dict[str, Any], "
f"got {type(self.selections).__name__}" f"got {type(self.selections).__name__}"
) )
context.result = result context.result = result
@ -203,7 +298,7 @@ class SelectionAction(BaseAction):
return return
tree.add(f"[dim]Default:[/] '{self.default_selection or self.last_result}'") tree.add(f"[dim]Default:[/] '{self.default_selection or self.last_result}'")
tree.add(f"[dim]Return:[/] {'Key' if self.return_key else 'Value'}") tree.add(f"[dim]Return:[/] {self.return_type.name.capitalize()}")
tree.add(f"[dim]Prompt:[/] {'Disabled' if self.never_prompt else 'Enabled'}") tree.add(f"[dim]Prompt:[/] {'Disabled' if self.never_prompt else 'Enabled'}")
if not parent: if not parent:
@ -218,6 +313,6 @@ class SelectionAction(BaseAction):
return ( return (
f"SelectionAction(name={self.name!r}, type={selection_type}, " f"SelectionAction(name={self.name!r}, type={selection_type}, "
f"default_selection={self.default_selection!r}, " f"default_selection={self.default_selection!r}, "
f"return_key={self.return_key}, " f"return_type={self.return_type!r}, "
f"prompt={'off' if self.never_prompt else 'on'})" f"prompt={'off' if self.never_prompt else 'on'})"
) )

View File

@ -35,3 +35,18 @@ class FileReturnType(Enum):
return member return member
valid = ", ".join(member.value for member in cls) valid = ", ".join(member.value for member in cls)
raise ValueError(f"Invalid FileReturnType: '{value}'. Must be one of: {valid}") raise ValueError(f"Invalid FileReturnType: '{value}'. Must be one of: {valid}")
class SelectionReturnType(Enum):
"""Enum for dictionary return types."""
KEY = "key"
VALUE = "value"
DESCRIPTION = "description"
DESCRIPTION_VALUE = "description_value"
ITEMS = "items"
@classmethod
def _missing_(cls, value: object) -> SelectionReturnType:
valid = ", ".join(member.value for member in cls)
raise ValueError(f"Invalid DictReturnType: '{value}'. Must be one of: {valid}")

View File

@ -43,6 +43,9 @@ class UserInputAction(BaseAction):
self.console = console or Console(color_system="auto") self.console = console or Console(color_system="auto")
self.prompt_session = prompt_session or PromptSession() self.prompt_session = prompt_session or PromptSession()
def get_infer_target(self) -> tuple[None, None]:
return None, None
async def _run(self, *args, **kwargs) -> str: async def _run(self, *args, **kwargs) -> str:
context = ExecutionContext( context = ExecutionContext(
name=self.name, name=self.name,

View File

@ -19,7 +19,6 @@ in building robust interactive menus.
from __future__ import annotations from __future__ import annotations
import shlex import shlex
from functools import cached_property
from typing import Any, Callable from typing import Any, Callable
from prompt_toolkit.formatted_text import FormattedText from prompt_toolkit.formatted_text import FormattedText
@ -27,25 +26,15 @@ from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator
from rich.console import Console from rich.console import Console
from rich.tree import Tree from rich.tree import Tree
from falyx.action.action import ( from falyx.action.action import Action, BaseAction
Action,
ActionGroup,
BaseAction,
ChainedAction,
ProcessAction,
)
from falyx.action.io_action import BaseIOAction
from falyx.context import ExecutionContext from falyx.context import ExecutionContext
from falyx.debug import register_debug_hooks from falyx.debug import register_debug_hooks
from falyx.execution_registry import ExecutionRegistry as er from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType from falyx.hook_manager import HookManager, HookType
from falyx.logger import logger from falyx.logger import logger
from falyx.options_manager import OptionsManager from falyx.options_manager import OptionsManager
from falyx.parsers import ( from falyx.parsers.argparse import CommandArgumentParser
CommandArgumentParser, from falyx.parsers.signature import infer_args_from_func
infer_args_from_func,
same_argument_definitions,
)
from falyx.prompt_utils import confirm_async, should_prompt_user from falyx.prompt_utils import confirm_async, should_prompt_user
from falyx.protocols import ArgParserProtocol from falyx.protocols import ArgParserProtocol
from falyx.retry import RetryPolicy from falyx.retry import RetryPolicy
@ -99,7 +88,6 @@ class Command(BaseModel):
retry_policy (RetryPolicy): Retry behavior configuration. retry_policy (RetryPolicy): Retry behavior configuration.
tags (list[str]): Organizational tags for the command. tags (list[str]): Organizational tags for the command.
logging_hooks (bool): Whether to attach logging hooks automatically. logging_hooks (bool): Whether to attach logging hooks automatically.
requires_input (bool | None): Indicates if the action needs input.
options_manager (OptionsManager): Manages global command-line options. options_manager (OptionsManager): Manages global command-line options.
arg_parser (CommandArgumentParser): Parses command arguments. arg_parser (CommandArgumentParser): Parses command arguments.
custom_parser (ArgParserProtocol | None): Custom argument parser. custom_parser (ArgParserProtocol | None): Custom argument parser.
@ -116,7 +104,7 @@ class Command(BaseModel):
key: str key: str
description: str description: str
action: BaseAction | Callable[[Any], Any] action: BaseAction | Callable[..., Any]
args: tuple = () args: tuple = ()
kwargs: dict[str, Any] = Field(default_factory=dict) kwargs: dict[str, Any] = Field(default_factory=dict)
hidden: bool = False hidden: bool = False
@ -138,24 +126,23 @@ class Command(BaseModel):
retry_policy: RetryPolicy = Field(default_factory=RetryPolicy) retry_policy: RetryPolicy = Field(default_factory=RetryPolicy)
tags: list[str] = Field(default_factory=list) tags: list[str] = Field(default_factory=list)
logging_hooks: bool = False logging_hooks: bool = False
requires_input: bool | None = None
options_manager: OptionsManager = Field(default_factory=OptionsManager) options_manager: OptionsManager = Field(default_factory=OptionsManager)
arg_parser: CommandArgumentParser = Field(default_factory=CommandArgumentParser) arg_parser: CommandArgumentParser = Field(default_factory=CommandArgumentParser)
arguments: list[dict[str, Any]] = Field(default_factory=list) arguments: list[dict[str, Any]] = Field(default_factory=list)
argument_config: Callable[[CommandArgumentParser], None] | None = None argument_config: Callable[[CommandArgumentParser], None] | None = None
custom_parser: ArgParserProtocol | None = None custom_parser: ArgParserProtocol | None = None
custom_help: Callable[[], str | None] | None = None custom_help: Callable[[], str | None] | None = None
auto_args: bool = False auto_args: bool = True
arg_metadata: dict[str, str | dict[str, Any]] = Field(default_factory=dict) arg_metadata: dict[str, str | dict[str, Any]] = Field(default_factory=dict)
_context: ExecutionContext | None = PrivateAttr(default=None) _context: ExecutionContext | None = PrivateAttr(default=None)
model_config = ConfigDict(arbitrary_types_allowed=True) model_config = ConfigDict(arbitrary_types_allowed=True)
def parse_args( async def parse_args(
self, raw_args: list[str] | str, from_validate: bool = False self, raw_args: list[str] | str, from_validate: bool = False
) -> tuple[tuple, dict]: ) -> tuple[tuple, dict]:
if self.custom_parser: if callable(self.custom_parser):
if isinstance(raw_args, str): if isinstance(raw_args, str):
try: try:
raw_args = shlex.split(raw_args) raw_args = shlex.split(raw_args)
@ -178,7 +165,9 @@ class Command(BaseModel):
raw_args, raw_args,
) )
return ((), {}) return ((), {})
return self.arg_parser.parse_args_split(raw_args, from_validate=from_validate) return await self.arg_parser.parse_args_split(
raw_args, from_validate=from_validate
)
@field_validator("action", mode="before") @field_validator("action", mode="before")
@classmethod @classmethod
@ -192,28 +181,15 @@ class Command(BaseModel):
def get_argument_definitions(self) -> list[dict[str, Any]]: def get_argument_definitions(self) -> list[dict[str, Any]]:
if self.arguments: if self.arguments:
return self.arguments return self.arguments
elif self.argument_config: elif callable(self.argument_config):
self.argument_config(self.arg_parser) self.argument_config(self.arg_parser)
elif self.auto_args: elif self.auto_args:
if isinstance(self.action, (Action, ProcessAction)): if isinstance(self.action, BaseAction):
return infer_args_from_func(self.action.action, self.arg_metadata) infer_target, maybe_metadata = self.action.get_infer_target()
elif isinstance(self.action, ChainedAction): # merge metadata with the action's metadata if not already in self.arg_metadata
if self.action.actions: if maybe_metadata:
action = self.action.actions[0] self.arg_metadata = {**maybe_metadata, **self.arg_metadata}
if isinstance(action, Action): return infer_args_from_func(infer_target, self.arg_metadata)
return infer_args_from_func(action.action, self.arg_metadata)
elif callable(action):
return infer_args_from_func(action, self.arg_metadata)
elif isinstance(self.action, ActionGroup):
arg_defs = same_argument_definitions(
self.action.actions, self.arg_metadata
)
if arg_defs:
return arg_defs
logger.debug(
"[Command:%s] auto_args disabled: mismatched ActionGroup arguments",
self.key,
)
elif callable(self.action): elif callable(self.action):
return infer_args_from_func(self.action, self.arg_metadata) return infer_args_from_func(self.action, self.arg_metadata)
return [] return []
@ -241,30 +217,9 @@ class Command(BaseModel):
if self.logging_hooks and isinstance(self.action, BaseAction): if self.logging_hooks and isinstance(self.action, BaseAction):
register_debug_hooks(self.action.hooks) register_debug_hooks(self.action.hooks)
if self.requires_input is None and self.detect_requires_input:
self.requires_input = True
self.hidden = True
elif self.requires_input is None:
self.requires_input = False
for arg_def in self.get_argument_definitions(): for arg_def in self.get_argument_definitions():
self.arg_parser.add_argument(*arg_def.pop("flags"), **arg_def) self.arg_parser.add_argument(*arg_def.pop("flags"), **arg_def)
@cached_property
def detect_requires_input(self) -> bool:
"""Detect if the action requires input based on its type."""
if isinstance(self.action, BaseIOAction):
return True
elif isinstance(self.action, ChainedAction):
return (
isinstance(self.action.actions[0], BaseIOAction)
if self.action.actions
else False
)
elif isinstance(self.action, ActionGroup):
return any(isinstance(action, BaseIOAction) for action in self.action.actions)
return False
def _inject_options_manager(self) -> None: def _inject_options_manager(self) -> None:
"""Inject the options manager into the action if applicable.""" """Inject the options manager into the action if applicable."""
if isinstance(self.action, BaseAction): if isinstance(self.action, BaseAction):
@ -357,7 +312,7 @@ class Command(BaseModel):
def show_help(self) -> bool: def show_help(self) -> bool:
"""Display the help message for the command.""" """Display the help message for the command."""
if self.custom_help: if callable(self.custom_help):
output = self.custom_help() output = self.custom_help()
if output: if output:
console.print(output) console.print(output)

View File

@ -98,7 +98,6 @@ class RawCommand(BaseModel):
retry: bool = False retry: bool = False
retry_all: bool = False retry_all: bool = False
retry_policy: RetryPolicy = Field(default_factory=RetryPolicy) retry_policy: RetryPolicy = Field(default_factory=RetryPolicy)
requires_input: bool | None = None
hidden: bool = False hidden: bool = False
help_text: str = "" help_text: str = ""

View File

@ -61,9 +61,9 @@ from falyx.options_manager import OptionsManager
from falyx.parsers import CommandArgumentParser, get_arg_parsers from falyx.parsers import CommandArgumentParser, get_arg_parsers
from falyx.protocols import ArgParserProtocol from falyx.protocols import ArgParserProtocol
from falyx.retry import RetryPolicy from falyx.retry import RetryPolicy
from falyx.signals import BackSignal, CancelSignal, FlowSignal, HelpSignal, QuitSignal from falyx.signals import BackSignal, CancelSignal, HelpSignal, QuitSignal
from falyx.themes import OneColors, get_nord_theme from falyx.themes import OneColors, get_nord_theme
from falyx.utils import CaseInsensitiveDict, _noop, chunks, get_program_invocation from falyx.utils import CaseInsensitiveDict, _noop, chunks
from falyx.version import __version__ from falyx.version import __version__
@ -83,14 +83,17 @@ class CommandValidator(Validator):
self.error_message = error_message self.error_message = error_message
def validate(self, document) -> None: def validate(self, document) -> None:
pass
async def validate_async(self, document) -> None:
text = document.text text = document.text
is_preview, choice, _, __ = self.falyx.get_command(text, from_validate=True) is_preview, choice, _, __ = await self.falyx.get_command(text, from_validate=True)
if is_preview: if is_preview:
return None return None
if not choice: if not choice:
raise ValidationError( raise ValidationError(
message=self.error_message, message=self.error_message,
cursor_position=document.get_end_of_document_position(), cursor_position=len(text),
) )
@ -111,6 +114,8 @@ class Falyx:
- Submenu nesting and action chaining - Submenu nesting and action chaining
- History tracking, help generation, and run key execution modes - History tracking, help generation, and run key execution modes
- Seamless CLI argument parsing and integration via argparse - Seamless CLI argument parsing and integration via argparse
- Declarative option management with OptionsManager
- Command level argument parsing and validation
- Extensible with user-defined hooks, bottom bars, and custom layouts - Extensible with user-defined hooks, bottom bars, and custom layouts
Args: Args:
@ -126,7 +131,7 @@ class Falyx:
never_prompt (bool): Seed default for `OptionsManager["never_prompt"]` never_prompt (bool): Seed default for `OptionsManager["never_prompt"]`
force_confirm (bool): Seed default for `OptionsManager["force_confirm"]` force_confirm (bool): Seed default for `OptionsManager["force_confirm"]`
cli_args (Namespace | None): Parsed CLI arguments, usually from argparse. cli_args (Namespace | None): Parsed CLI arguments, usually from argparse.
options (OptionsManager | None): Declarative option mappings. options (OptionsManager | None): Declarative option mappings for global state.
custom_table (Callable[[Falyx], Table] | Table | None): Custom menu table custom_table (Callable[[Falyx], Table] | Table | None): Custom menu table
generator. generator.
@ -158,8 +163,9 @@ class Falyx:
force_confirm: bool = False, force_confirm: bool = False,
cli_args: Namespace | None = None, cli_args: Namespace | None = None,
options: OptionsManager | None = None, options: OptionsManager | None = None,
render_menu: Callable[["Falyx"], None] | None = None, render_menu: Callable[[Falyx], None] | None = None,
custom_table: Callable[["Falyx"], Table] | Table | None = None, custom_table: Callable[[Falyx], Table] | Table | None = None,
hide_menu_table: bool = False,
) -> None: ) -> None:
"""Initializes the Falyx object.""" """Initializes the Falyx object."""
self.title: str | Markdown = title self.title: str | Markdown = title
@ -183,8 +189,9 @@ class Falyx:
self._never_prompt: bool = never_prompt self._never_prompt: bool = never_prompt
self._force_confirm: bool = force_confirm self._force_confirm: bool = force_confirm
self.cli_args: Namespace | None = cli_args self.cli_args: Namespace | None = cli_args
self.render_menu: Callable[["Falyx"], None] | None = render_menu self.render_menu: Callable[[Falyx], None] | None = render_menu
self.custom_table: Callable[["Falyx"], Table] | Table | None = custom_table self.custom_table: Callable[[Falyx], Table] | Table | None = custom_table
self._hide_menu_table: bool = hide_menu_table
self.validate_options(cli_args, options) self.validate_options(cli_args, options)
self._prompt_session: PromptSession | None = None self._prompt_session: PromptSession | None = None
self.mode = FalyxMode.MENU self.mode = FalyxMode.MENU
@ -287,8 +294,6 @@ class Falyx:
for command in self.commands.values(): for command in self.commands.values():
help_text = command.help_text or command.description help_text = command.help_text or command.description
if command.requires_input:
help_text += " [dim](requires input)[/dim]"
table.add_row( table.add_row(
f"[{command.style}]{command.key}[/]", f"[{command.style}]{command.key}[/]",
", ".join(command.aliases) if command.aliases else "", ", ".join(command.aliases) if command.aliases else "",
@ -445,7 +450,6 @@ class Falyx:
bottom_toolbar=self._get_bottom_bar_render(), bottom_toolbar=self._get_bottom_bar_render(),
key_bindings=self.key_bindings, key_bindings=self.key_bindings,
validate_while_typing=False, validate_while_typing=False,
interrupt_exception=FlowSignal,
) )
return self._prompt_session return self._prompt_session
@ -526,7 +530,7 @@ class Falyx:
key: str = "X", key: str = "X",
description: str = "Exit", description: str = "Exit",
aliases: list[str] | None = None, aliases: list[str] | None = None,
action: Callable[[Any], Any] | None = None, action: Callable[..., Any] | None = None,
style: str = OneColors.DARK_RED, style: str = OneColors.DARK_RED,
confirm: bool = False, confirm: bool = False,
confirm_message: str = "Are you sure?", confirm_message: str = "Are you sure?",
@ -580,7 +584,7 @@ class Falyx:
self, self,
key: str, key: str,
description: str, description: str,
action: BaseAction | Callable[[Any], Any], action: BaseAction | Callable[..., Any],
*, *,
args: tuple = (), args: tuple = (),
kwargs: dict[str, Any] | None = None, kwargs: dict[str, Any] | None = None,
@ -608,13 +612,12 @@ class Falyx:
retry: bool = False, retry: bool = False,
retry_all: bool = False, retry_all: bool = False,
retry_policy: RetryPolicy | None = None, retry_policy: RetryPolicy | None = None,
requires_input: bool | None = None,
arg_parser: CommandArgumentParser | None = None, arg_parser: CommandArgumentParser | None = None,
arguments: list[dict[str, Any]] | None = None, arguments: list[dict[str, Any]] | None = None,
argument_config: Callable[[CommandArgumentParser], None] | None = None, argument_config: Callable[[CommandArgumentParser], None] | None = None,
custom_parser: ArgParserProtocol | None = None, custom_parser: ArgParserProtocol | None = None,
custom_help: Callable[[], str | None] | None = None, custom_help: Callable[[], str | None] | None = None,
auto_args: bool = False, auto_args: bool = True,
arg_metadata: dict[str, str | dict[str, Any]] | None = None, arg_metadata: dict[str, str | dict[str, Any]] | None = None,
) -> Command: ) -> Command:
"""Adds an command to the menu, preventing duplicates.""" """Adds an command to the menu, preventing duplicates."""
@ -660,7 +663,6 @@ class Falyx:
retry=retry, retry=retry,
retry_all=retry_all, retry_all=retry_all,
retry_policy=retry_policy or RetryPolicy(), retry_policy=retry_policy or RetryPolicy(),
requires_input=requires_input,
options_manager=self.options, options_manager=self.options,
arg_parser=arg_parser, arg_parser=arg_parser,
arguments=arguments or [], arguments=arguments or [],
@ -741,7 +743,7 @@ class Falyx:
return True, input_str[1:].strip() return True, input_str[1:].strip()
return False, input_str.strip() return False, input_str.strip()
def get_command( async def get_command(
self, raw_choices: str, from_validate=False self, raw_choices: str, from_validate=False
) -> tuple[bool, Command | None, tuple, dict[str, Any]]: ) -> tuple[bool, Command | None, tuple, dict[str, Any]]:
""" """
@ -768,12 +770,15 @@ class Falyx:
choice = choice.upper() choice = choice.upper()
name_map = self._name_map name_map = self._name_map
if choice in name_map: if name_map.get(choice):
if not from_validate: if not from_validate:
logger.info("Command '%s' selected.", choice) logger.info("Command '%s' selected.", choice)
if input_args and name_map[choice].arg_parser: if is_preview:
return True, name_map[choice], args, kwargs
try: try:
args, kwargs = name_map[choice].parse_args(input_args, from_validate) args, kwargs = await name_map[choice].parse_args(
input_args, from_validate
)
except CommandArgumentError as error: except CommandArgumentError as error:
if not from_validate: if not from_validate:
if not name_map[choice].show_help(): if not name_map[choice].show_help():
@ -834,7 +839,7 @@ class Falyx:
"""Processes the action of the selected command.""" """Processes the action of the selected command."""
with patch_stdout(raw=True): with patch_stdout(raw=True):
choice = await self.prompt_session.prompt_async() choice = await self.prompt_session.prompt_async()
is_preview, selected_command, args, kwargs = self.get_command(choice) is_preview, selected_command, args, kwargs = await self.get_command(choice)
if not selected_command: if not selected_command:
logger.info("Invalid command '%s'.", choice) logger.info("Invalid command '%s'.", choice)
return True return True
@ -844,15 +849,6 @@ class Falyx:
await selected_command.preview() await selected_command.preview()
return True return True
if selected_command.requires_input:
program = get_program_invocation()
self.console.print(
f"[{OneColors.LIGHT_YELLOW}]⚠️ Command '{selected_command.key}' requires"
f" input and must be run via [{OneColors.MAGENTA}]'{program} run"
f"'[{OneColors.LIGHT_YELLOW}] with proper piping or arguments.[/]"
)
return True
self.last_run_command = selected_command self.last_run_command = selected_command
if selected_command == self.exit_command: if selected_command == self.exit_command:
@ -885,7 +881,7 @@ class Falyx:
) -> Any: ) -> Any:
"""Run a command by key without displaying the menu (non-interactive mode).""" """Run a command by key without displaying the menu (non-interactive mode)."""
self.debug_hooks() self.debug_hooks()
is_preview, selected_command, _, __ = self.get_command(command_key) is_preview, selected_command, _, __ = await self.get_command(command_key)
kwargs = kwargs or {} kwargs = kwargs or {}
self.last_run_command = selected_command self.last_run_command = selected_command
@ -984,6 +980,7 @@ class Falyx:
self.print_message(self.welcome_message) self.print_message(self.welcome_message)
try: try:
while True: while True:
if not self.options.get("hide_menu_table", self._hide_menu_table):
if callable(self.render_menu): if callable(self.render_menu):
self.render_menu(self) self.render_menu(self)
else: else:
@ -1020,6 +1017,9 @@ class Falyx:
if not self.options.get("force_confirm"): if not self.options.get("force_confirm"):
self.options.set("force_confirm", self._force_confirm) self.options.set("force_confirm", self._force_confirm)
if not self.options.get("hide_menu_table"):
self.options.set("hide_menu_table", self._hide_menu_table)
if self.cli_args.verbose: if self.cli_args.verbose:
logging.getLogger("falyx").setLevel(logging.DEBUG) logging.getLogger("falyx").setLevel(logging.DEBUG)
@ -1037,7 +1037,7 @@ class Falyx:
if self.cli_args.command == "preview": if self.cli_args.command == "preview":
self.mode = FalyxMode.PREVIEW self.mode = FalyxMode.PREVIEW
_, command, args, kwargs = self.get_command(self.cli_args.name) _, command, args, kwargs = await self.get_command(self.cli_args.name)
if not command: if not command:
self.console.print( self.console.print(
f"[{OneColors.DARK_RED}]❌ Command '{self.cli_args.name}' not found." f"[{OneColors.DARK_RED}]❌ Command '{self.cli_args.name}' not found."
@ -1051,7 +1051,7 @@ class Falyx:
if self.cli_args.command == "run": if self.cli_args.command == "run":
self.mode = FalyxMode.RUN self.mode = FalyxMode.RUN
is_preview, command, _, __ = self.get_command(self.cli_args.name) is_preview, command, _, __ = await self.get_command(self.cli_args.name)
if is_preview: if is_preview:
if command is None: if command is None:
sys.exit(1) sys.exit(1)
@ -1062,7 +1062,7 @@ class Falyx:
sys.exit(1) sys.exit(1)
self._set_retry_policy(command) self._set_retry_policy(command)
try: try:
args, kwargs = command.parse_args(self.cli_args.command_args) args, kwargs = await command.parse_args(self.cli_args.command_args)
except HelpSignal: except HelpSignal:
sys.exit(0) sys.exit(0)
try: try:

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import inspect import inspect
from enum import Enum from enum import Enum
from typing import Awaitable, Callable, Dict, List, Optional, Union from typing import Awaitable, Callable, Union
from falyx.context import ExecutionContext from falyx.context import ExecutionContext
from falyx.logger import logger from falyx.logger import logger
@ -24,7 +24,7 @@ class HookType(Enum):
ON_TEARDOWN = "on_teardown" ON_TEARDOWN = "on_teardown"
@classmethod @classmethod
def choices(cls) -> List[HookType]: def choices(cls) -> list[HookType]:
"""Return a list of all hook type choices.""" """Return a list of all hook type choices."""
return list(cls) return list(cls)
@ -37,16 +37,17 @@ class HookManager:
"""HookManager""" """HookManager"""
def __init__(self) -> None: def __init__(self) -> None:
self._hooks: Dict[HookType, List[Hook]] = { self._hooks: dict[HookType, list[Hook]] = {
hook_type: [] for hook_type in HookType hook_type: [] for hook_type in HookType
} }
def register(self, hook_type: HookType, hook: Hook): def register(self, hook_type: HookType | str, hook: Hook):
if hook_type not in HookType: """Raises ValueError if the hook type is not supported."""
raise ValueError(f"Unsupported hook type: {hook_type}") if not isinstance(hook_type, HookType):
hook_type = HookType(hook_type)
self._hooks[hook_type].append(hook) self._hooks[hook_type].append(hook)
def clear(self, hook_type: Optional[HookType] = None): def clear(self, hook_type: HookType | None = None):
if hook_type: if hook_type:
self._hooks[hook_type] = [] self._hooks[hook_type] = []
else: else:

View File

@ -2,6 +2,8 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from prompt_toolkit.formatted_text import FormattedText
from falyx.action import BaseAction from falyx.action import BaseAction
from falyx.signals import BackSignal, QuitSignal from falyx.signals import BackSignal, QuitSignal
from falyx.themes import OneColors from falyx.themes import OneColors
@ -26,6 +28,12 @@ class MenuOption:
"""Render the menu option for display.""" """Render the menu option for display."""
return f"[{OneColors.WHITE}][{key}][/] [{self.style}]{self.description}[/]" return f"[{OneColors.WHITE}][{key}][/] [{self.style}]{self.description}[/]"
def render_prompt(self, key: str) -> FormattedText:
"""Render the menu option for prompt display."""
return FormattedText(
[(OneColors.WHITE, f"[{key}] "), (self.style, self.description)]
)
class MenuOptionMap(CaseInsensitiveDict): class MenuOptionMap(CaseInsensitiveDict):
""" """
@ -33,7 +41,7 @@ class MenuOptionMap(CaseInsensitiveDict):
and special signal entries like Quit and Back. and special signal entries like Quit and Back.
""" """
RESERVED_KEYS = {"Q", "B"} RESERVED_KEYS = {"B", "X"}
def __init__( def __init__(
self, self,
@ -49,14 +57,14 @@ class MenuOptionMap(CaseInsensitiveDict):
def _inject_reserved_defaults(self): def _inject_reserved_defaults(self):
from falyx.action import SignalAction from falyx.action import SignalAction
self._add_reserved(
"Q",
MenuOption("Exit", SignalAction("Quit", QuitSignal()), OneColors.DARK_RED),
)
self._add_reserved( self._add_reserved(
"B", "B",
MenuOption("Back", SignalAction("Back", BackSignal()), OneColors.DARK_YELLOW), MenuOption("Back", SignalAction("Back", BackSignal()), OneColors.DARK_YELLOW),
) )
self._add_reserved(
"X",
MenuOption("Exit", SignalAction("Quit", QuitSignal()), OneColors.DARK_RED),
)
def _add_reserved(self, key: str, option: MenuOption) -> None: def _add_reserved(self, key: str, option: MenuOption) -> None:
"""Add a reserved key, bypassing validation.""" """Add a reserved key, bypassing validation."""
@ -78,8 +86,20 @@ class MenuOptionMap(CaseInsensitiveDict):
raise ValueError(f"Cannot delete reserved option '{key}'.") raise ValueError(f"Cannot delete reserved option '{key}'.")
super().__delitem__(key) super().__delitem__(key)
def update(self, other=None, **kwargs):
"""Update the selection options with another dictionary."""
if other:
for key, option in other.items():
if not isinstance(option, MenuOption):
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
self[key] = option
for key, option in kwargs.items():
if not isinstance(option, MenuOption):
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
self[key] = option
def items(self, include_reserved: bool = True): def items(self, include_reserved: bool = True):
for k, v in super().items(): for key, option in super().items():
if not include_reserved and k in self.RESERVED_KEYS: if not include_reserved and key in self.RESERVED_KEYS:
continue continue
yield k, v yield key, option

View File

@ -7,8 +7,6 @@ Licensed under the MIT License. See LICENSE file for details.
from .argparse import Argument, ArgumentAction, CommandArgumentParser from .argparse import Argument, ArgumentAction, CommandArgumentParser
from .parsers import FalyxParsers, get_arg_parsers from .parsers import FalyxParsers, get_arg_parsers
from .signature import infer_args_from_func
from .utils import same_argument_definitions
__all__ = [ __all__ = [
"Argument", "Argument",
@ -16,6 +14,4 @@ __all__ = [
"CommandArgumentParser", "CommandArgumentParser",
"get_arg_parsers", "get_arg_parsers",
"FalyxParsers", "FalyxParsers",
"infer_args_from_func",
"same_argument_definitions",
] ]

View File

@ -1,4 +1,6 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed # Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
from __future__ import annotations
from copy import deepcopy from copy import deepcopy
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
@ -23,12 +25,21 @@ class ArgumentAction(Enum):
COUNT = "count" COUNT = "count"
HELP = "help" HELP = "help"
@classmethod
def choices(cls) -> list[ArgumentAction]:
"""Return a list of all argument actions."""
return list(cls)
def __str__(self) -> str:
"""Return the string representation of the argument action."""
return self.value
@dataclass @dataclass
class Argument: class Argument:
"""Represents a command-line argument.""" """Represents a command-line argument."""
flags: list[str] flags: tuple[str, ...]
dest: str # Destination name for the argument dest: str # Destination name for the argument
action: ArgumentAction = ( action: ArgumentAction = (
ArgumentAction.STORE ArgumentAction.STORE
@ -38,7 +49,7 @@ class Argument:
choices: list[str] | None = None # List of valid choices for the argument choices: list[str] | None = None # List of valid choices for the argument
required: bool = False # True if the argument is required required: bool = False # True if the argument is required
help: str = "" # Help text for the argument help: str = "" # Help text for the argument
nargs: int | str = 1 # int, '?', '*', '+' nargs: int | str | None = None # int, '?', '*', '+', None
positional: bool = False # True if no leading - or -- in flags positional: bool = False # True if no leading - or -- in flags
def get_positional_text(self) -> str: def get_positional_text(self) -> str:
@ -66,7 +77,11 @@ class Argument:
and not self.positional and not self.positional
): ):
choice_text = self.dest.upper() choice_text = self.dest.upper()
elif isinstance(self.nargs, str): elif self.action in (
ArgumentAction.STORE,
ArgumentAction.APPEND,
ArgumentAction.EXTEND,
) or isinstance(self.nargs, str):
choice_text = self.dest choice_text = self.dest
if self.nargs == "?": if self.nargs == "?":
@ -136,6 +151,7 @@ class CommandArgumentParser:
aliases: list[str] | None = None, aliases: list[str] | None = None,
) -> None: ) -> None:
"""Initialize the CommandArgumentParser.""" """Initialize the CommandArgumentParser."""
self.console = Console(color_system="auto")
self.command_key: str = command_key self.command_key: str = command_key
self.command_description: str = command_description self.command_description: str = command_description
self.command_style: str = command_style self.command_style: str = command_style
@ -148,7 +164,6 @@ class CommandArgumentParser:
self._flag_map: dict[str, Argument] = {} self._flag_map: dict[str, Argument] = {}
self._dest_set: set[str] = set() self._dest_set: set[str] = set()
self._add_help() self._add_help()
self.console = Console(color_system="auto")
def _add_help(self): def _add_help(self):
"""Add help argument to the parser.""" """Add help argument to the parser."""
@ -170,9 +185,7 @@ class CommandArgumentParser:
raise CommandArgumentError("Positional arguments cannot have multiple flags") raise CommandArgumentError("Positional arguments cannot have multiple flags")
return positional return positional
def _get_dest_from_flags( def _get_dest_from_flags(self, flags: tuple[str, ...], dest: str | None) -> str:
self, flags: tuple[str, ...], dest: str | None
) -> str | None:
"""Convert flags to a destination name.""" """Convert flags to a destination name."""
if dest: if dest:
if not dest.replace("_", "").isalnum(): if not dest.replace("_", "").isalnum():
@ -201,7 +214,7 @@ class CommandArgumentParser:
return dest return dest
def _determine_required( def _determine_required(
self, required: bool, positional: bool, nargs: int | str self, required: bool, positional: bool, nargs: int | str | None
) -> bool: ) -> bool:
"""Determine if the argument is required.""" """Determine if the argument is required."""
if required: if required:
@ -219,7 +232,22 @@ class CommandArgumentParser:
return required return required
def _validate_nargs(self, nargs: int | str) -> int | str: def _validate_nargs(
self, nargs: int | str | None, action: ArgumentAction
) -> int | str | None:
if action in (
ArgumentAction.STORE_FALSE,
ArgumentAction.STORE_TRUE,
ArgumentAction.COUNT,
ArgumentAction.HELP,
):
if nargs is not None:
raise CommandArgumentError(
f"nargs cannot be specified for {action} actions"
)
return None
if nargs is None:
nargs = 1
allowed_nargs = ("?", "*", "+") allowed_nargs = ("?", "*", "+")
if isinstance(nargs, int): if isinstance(nargs, int):
if nargs <= 0: if nargs <= 0:
@ -231,7 +259,9 @@ class CommandArgumentParser:
raise CommandArgumentError(f"nargs must be an int or one of {allowed_nargs}") raise CommandArgumentError(f"nargs must be an int or one of {allowed_nargs}")
return nargs return nargs
def _normalize_choices(self, choices: Iterable, expected_type: Any) -> list[Any]: def _normalize_choices(
self, choices: Iterable | None, expected_type: Any
) -> list[Any]:
if choices is not None: if choices is not None:
if isinstance(choices, dict): if isinstance(choices, dict):
raise CommandArgumentError("choices cannot be a dict") raise CommandArgumentError("choices cannot be a dict")
@ -278,8 +308,34 @@ class CommandArgumentParser:
f"Default list value {default!r} for '{dest}' cannot be coerced to {expected_type.__name__}" f"Default list value {default!r} for '{dest}' cannot be coerced to {expected_type.__name__}"
) )
def _validate_action(
self, action: ArgumentAction | str, positional: bool
) -> ArgumentAction:
if not isinstance(action, ArgumentAction):
try:
action = ArgumentAction(action)
except ValueError:
raise CommandArgumentError(
f"Invalid action '{action}' is not a valid ArgumentAction"
)
if action in (
ArgumentAction.STORE_TRUE,
ArgumentAction.STORE_FALSE,
ArgumentAction.COUNT,
ArgumentAction.HELP,
):
if positional:
raise CommandArgumentError(
f"Action '{action}' cannot be used with positional arguments"
)
return action
def _resolve_default( def _resolve_default(
self, action: ArgumentAction, default: Any, nargs: str | int self,
default: Any,
action: ArgumentAction,
nargs: str | int | None,
) -> Any: ) -> Any:
"""Get the default value for the argument.""" """Get the default value for the argument."""
if default is None: if default is None:
@ -313,7 +369,18 @@ class CommandArgumentParser:
f"Flag '{flag}' must be a single character or start with '--'" f"Flag '{flag}' must be a single character or start with '--'"
) )
def add_argument(self, *flags, **kwargs): def add_argument(
self,
*flags,
action: str | ArgumentAction = "store",
nargs: int | str | None = None,
default: Any = None,
type: Any = str,
choices: Iterable | None = None,
required: bool = False,
help: str = "",
dest: str | None = None,
) -> None:
"""Add an argument to the parser. """Add an argument to the parser.
Args: Args:
name or flags: Either a name or prefixed flags (e.g. 'faylx', '-f', '--falyx'). name or flags: Either a name or prefixed flags (e.g. 'faylx', '-f', '--falyx').
@ -326,9 +393,10 @@ class CommandArgumentParser:
help: A brief description of the argument. help: A brief description of the argument.
dest: The name of the attribute to be added to the object returned by parse_args(). dest: The name of the attribute to be added to the object returned by parse_args().
""" """
expected_type = type
self._validate_flags(flags) self._validate_flags(flags)
positional = self._is_positional(flags) positional = self._is_positional(flags)
dest = self._get_dest_from_flags(flags, kwargs.get("dest")) dest = self._get_dest_from_flags(flags, dest)
if dest in self._dest_set: if dest in self._dest_set:
raise CommandArgumentError( raise CommandArgumentError(
f"Destination '{dest}' is already defined.\n" f"Destination '{dest}' is already defined.\n"
@ -336,18 +404,9 @@ class CommandArgumentParser:
"is not supported. Define a unique 'dest' for each argument." "is not supported. Define a unique 'dest' for each argument."
) )
self._dest_set.add(dest) self._dest_set.add(dest)
action = kwargs.get("action", ArgumentAction.STORE) action = self._validate_action(action, positional)
if not isinstance(action, ArgumentAction): nargs = self._validate_nargs(nargs, action)
try: default = self._resolve_default(default, action, nargs)
action = ArgumentAction(action)
except ValueError:
raise CommandArgumentError(
f"Invalid action '{action}' is not a valid ArgumentAction"
)
flags = list(flags)
nargs = self._validate_nargs(kwargs.get("nargs", 1))
default = self._resolve_default(action, kwargs.get("default"), nargs)
expected_type = kwargs.get("type", str)
if ( if (
action in (ArgumentAction.STORE, ArgumentAction.APPEND, ArgumentAction.EXTEND) action in (ArgumentAction.STORE, ArgumentAction.APPEND, ArgumentAction.EXTEND)
and default is not None and default is not None
@ -356,14 +415,12 @@ class CommandArgumentParser:
self._validate_default_list_type(default, expected_type, dest) self._validate_default_list_type(default, expected_type, dest)
else: else:
self._validate_default_type(default, expected_type, dest) self._validate_default_type(default, expected_type, dest)
choices = self._normalize_choices(kwargs.get("choices"), expected_type) choices = self._normalize_choices(choices, expected_type)
if default is not None and choices and default not in choices: if default is not None and choices and default not in choices:
raise CommandArgumentError( raise CommandArgumentError(
f"Default value '{default}' not in allowed choices: {choices}" f"Default value '{default}' not in allowed choices: {choices}"
) )
required = self._determine_required( required = self._determine_required(required, positional, nargs)
kwargs.get("required", False), positional, nargs
)
argument = Argument( argument = Argument(
flags=flags, flags=flags,
dest=dest, dest=dest,
@ -372,7 +429,7 @@ class CommandArgumentParser:
default=default, default=default,
choices=choices, choices=choices,
required=required, required=required,
help=kwargs.get("help", ""), help=help,
nargs=nargs, nargs=nargs,
positional=positional, positional=positional,
) )
@ -415,11 +472,11 @@ class CommandArgumentParser:
values = [] values = []
i = start i = start
if isinstance(spec.nargs, int): if isinstance(spec.nargs, int):
# assert i + spec.nargs <= len(
# args
# ), "Not enough arguments provided: shouldn't happen"
values = args[i : i + spec.nargs] values = args[i : i + spec.nargs]
return values, i + spec.nargs return values, i + spec.nargs
elif spec.nargs is None:
values = [args[i]]
return values, i + 1
elif spec.nargs == "+": elif spec.nargs == "+":
if i >= len(args): if i >= len(args):
raise CommandArgumentError( raise CommandArgumentError(
@ -464,6 +521,8 @@ class CommandArgumentParser:
for next_spec in positional_args[j + 1 :]: for next_spec in positional_args[j + 1 :]:
if isinstance(next_spec.nargs, int): if isinstance(next_spec.nargs, int):
min_required += next_spec.nargs min_required += next_spec.nargs
elif next_spec.nargs is None:
min_required += 1
elif next_spec.nargs == "+": elif next_spec.nargs == "+":
min_required += 1 min_required += 1
elif next_spec.nargs == "?": elif next_spec.nargs == "?":
@ -506,7 +565,7 @@ class CommandArgumentParser:
return i return i
def parse_args( async def parse_args(
self, args: list[str] | None = None, from_validate: bool = False self, args: list[str] | None = None, from_validate: bool = False
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Parse Falyx Command arguments.""" """Parse Falyx Command arguments."""
@ -654,7 +713,7 @@ class CommandArgumentParser:
result.pop("help", None) result.pop("help", None)
return result return result
def parse_args_split( async def parse_args_split(
self, args: list[str], from_validate: bool = False self, args: list[str], from_validate: bool = False
) -> tuple[tuple[Any, ...], dict[str, Any]]: ) -> tuple[tuple[Any, ...], dict[str, Any]]:
""" """
@ -662,7 +721,7 @@ class CommandArgumentParser:
tuple[args, kwargs] - Positional arguments in defined order, tuple[args, kwargs] - Positional arguments in defined order,
followed by keyword argument mapping. followed by keyword argument mapping.
""" """
parsed = self.parse_args(args, from_validate) parsed = await self.parse_args(args, from_validate)
args_list = [] args_list = []
kwargs_dict = {} kwargs_dict = {}
for arg in self._arguments: for arg in self._arguments:

View File

@ -114,7 +114,7 @@ def get_arg_parsers(
help="Skip confirmation prompts", help="Skip confirmation prompts",
) )
run_group.add_argument( run_parser.add_argument(
"command_args", "command_args",
nargs=REMAINDER, nargs=REMAINDER,
help="Arguments to pass to the command (if applicable)", help="Arguments to pass to the command (if applicable)",

View File

@ -1,17 +1,20 @@
import inspect import inspect
from typing import Any, Callable from typing import Any, Callable
from falyx import logger from falyx.logger import logger
def infer_args_from_func( def infer_args_from_func(
func: Callable[[Any], Any], func: Callable[[Any], Any] | None,
arg_metadata: dict[str, str | dict[str, Any]] | None = None, arg_metadata: dict[str, str | dict[str, Any]] | None = None,
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
""" """
Infer argument definitions from a callable's signature. Infer argument definitions from a callable's signature.
Returns a list of kwargs suitable for CommandArgumentParser.add_argument. Returns a list of kwargs suitable for CommandArgumentParser.add_argument.
""" """
if not callable(func):
logger.debug("Provided argument is not callable: %s", func)
return []
arg_metadata = arg_metadata or {} arg_metadata = arg_metadata or {}
signature = inspect.signature(func) signature = inspect.signature(func)
arg_defs = [] arg_defs = []
@ -39,7 +42,7 @@ def infer_args_from_func(
else: else:
flags = [f"--{name.replace('_', '-')}"] flags = [f"--{name.replace('_', '-')}"]
action = "store" action = "store"
nargs: int | str = 1 nargs: int | str | None = None
if arg_type is bool: if arg_type is bool:
if param.default is False: if param.default is False:

View File

@ -1,7 +1,6 @@
from typing import Any from typing import Any
from falyx import logger from falyx import logger
from falyx.action.action import Action, ChainedAction, ProcessAction
from falyx.parsers.signature import infer_args_from_func from falyx.parsers.signature import infer_args_from_func
@ -9,17 +8,13 @@ def same_argument_definitions(
actions: list[Any], actions: list[Any],
arg_metadata: dict[str, str | dict[str, Any]] | None = None, arg_metadata: dict[str, str | dict[str, Any]] | None = None,
) -> list[dict[str, Any]] | None: ) -> list[dict[str, Any]] | None:
from falyx.action.action import BaseAction
arg_sets = [] arg_sets = []
for action in actions: for action in actions:
if isinstance(action, (Action, ProcessAction)): if isinstance(action, BaseAction):
arg_defs = infer_args_from_func(action.action, arg_metadata) infer_target, _ = action.get_infer_target()
elif isinstance(action, ChainedAction): arg_defs = infer_args_from_func(infer_target, arg_metadata)
if action.actions:
action = action.actions[0]
if isinstance(action, Action):
arg_defs = infer_args_from_func(action.action, arg_metadata)
elif callable(action):
arg_defs = infer_args_from_func(action, arg_metadata)
elif callable(action): elif callable(action):
arg_defs = infer_args_from_func(action, arg_metadata) arg_defs = infer_args_from_func(action, arg_metadata)
else: else:

View File

@ -10,7 +10,7 @@ from rich.markup import escape
from rich.table import Table from rich.table import Table
from falyx.themes import OneColors from falyx.themes import OneColors
from falyx.utils import chunks from falyx.utils import CaseInsensitiveDict, chunks
from falyx.validators import int_range_validator, key_validator from falyx.validators import int_range_validator, key_validator
@ -32,6 +32,62 @@ class SelectionOption:
return f"[{OneColors.WHITE}]{key}[/] [{self.style}]{self.description}[/]" return f"[{OneColors.WHITE}]{key}[/] [{self.style}]{self.description}[/]"
class SelectionOptionMap(CaseInsensitiveDict):
"""
Manages selection options including validation and reserved key protection.
"""
RESERVED_KEYS: set[str] = set()
def __init__(
self,
options: dict[str, SelectionOption] | None = None,
allow_reserved: bool = False,
):
super().__init__()
self.allow_reserved = allow_reserved
if options:
self.update(options)
def _add_reserved(self, key: str, option: SelectionOption) -> None:
"""Add a reserved key, bypassing validation."""
norm_key = key.upper()
super().__setitem__(norm_key, option)
def __setitem__(self, key: str, option: SelectionOption) -> None:
if not isinstance(option, SelectionOption):
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
norm_key = key.upper()
if norm_key in self.RESERVED_KEYS and not self.allow_reserved:
raise ValueError(
f"Key '{key}' is reserved and cannot be used in SelectionOptionMap."
)
super().__setitem__(norm_key, option)
def __delitem__(self, key: str) -> None:
if key.upper() in self.RESERVED_KEYS and not self.allow_reserved:
raise ValueError(f"Cannot delete reserved option '{key}'.")
super().__delitem__(key)
def update(self, other=None, **kwargs):
"""Update the selection options with another dictionary."""
if other:
for key, option in other.items():
if not isinstance(option, SelectionOption):
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
self[key] = option
for key, option in kwargs.items():
if not isinstance(option, SelectionOption):
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
self[key] = option
def items(self, include_reserved: bool = True):
for k, v in super().items():
if not include_reserved and k in self.RESERVED_KEYS:
continue
yield k, v
def render_table_base( def render_table_base(
title: str, title: str,
*, *,
@ -215,7 +271,7 @@ async def prompt_for_index(
prompt_session: PromptSession | None = None, prompt_session: PromptSession | None = None,
prompt_message: str = "Select an option > ", prompt_message: str = "Select an option > ",
show_table: bool = True, show_table: bool = True,
): ) -> int:
prompt_session = prompt_session or PromptSession() prompt_session = prompt_session or PromptSession()
console = console or Console(color_system="auto") console = console or Console(color_system="auto")

View File

@ -1 +1 @@
__version__ = "0.1.29" __version__ = "0.1.35"

View File

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "falyx" name = "falyx"
version = "0.1.29" version = "0.1.35"
description = "Reliable and introspectable async CLI action framework." description = "Reliable and introspectable async CLI action framework."
authors = ["Roland Thomas Jr <roland@rtj.dev>"] authors = ["Roland Thomas Jr <roland@rtj.dev>"]
license = "MIT" license = "MIT"

View File

@ -1,7 +1,7 @@
# test_command.py # test_command.py
import pytest import pytest
from falyx.action import Action, ActionGroup, BaseIOAction, ChainedAction from falyx.action import Action, BaseIOAction, ChainedAction
from falyx.command import Command from falyx.command import Command
from falyx.execution_registry import ExecutionRegistry as er from falyx.execution_registry import ExecutionRegistry as er
from falyx.retry import RetryPolicy from falyx.retry import RetryPolicy
@ -56,102 +56,6 @@ def test_command_str():
) )
@pytest.mark.parametrize(
"action_factory, expected_requires_input",
[
(lambda: Action(name="normal", action=dummy_action), False),
(lambda: DummyInputAction(name="io"), True),
(
lambda: ChainedAction(name="chain", actions=[DummyInputAction(name="io")]),
True,
),
(
lambda: ActionGroup(name="group", actions=[DummyInputAction(name="io")]),
True,
),
],
)
def test_command_requires_input_detection(action_factory, expected_requires_input):
action = action_factory()
cmd = Command(key="TEST", description="Test Command", action=action)
assert cmd.requires_input == expected_requires_input
if expected_requires_input:
assert cmd.hidden is True
else:
assert cmd.hidden is False
def test_requires_input_flag_detected_for_baseioaction():
"""Command should automatically detect requires_input=True for BaseIOAction."""
cmd = Command(
key="X",
description="Echo input",
action=DummyInputAction(name="dummy"),
)
assert cmd.requires_input is True
assert cmd.hidden is True
def test_requires_input_manual_override():
"""Command manually set requires_input=False should not auto-hide."""
cmd = Command(
key="Y",
description="Custom input command",
action=DummyInputAction(name="dummy"),
requires_input=False,
)
assert cmd.requires_input is False
assert cmd.hidden is False
def test_default_command_does_not_require_input():
"""Normal Command without IO Action should not require input."""
cmd = Command(
key="Z",
description="Simple action",
action=lambda: 42,
)
assert cmd.requires_input is False
assert cmd.hidden is False
def test_chain_requires_input():
"""If first action in a chain requires input, the command should require input."""
chain = ChainedAction(
name="ChainWithInput",
actions=[
DummyInputAction(name="dummy"),
Action(name="action1", action=lambda: 1),
],
)
cmd = Command(
key="A",
description="Chain with input",
action=chain,
)
assert cmd.requires_input is True
assert cmd.hidden is True
def test_group_requires_input():
"""If any action in a group requires input, the command should require input."""
group = ActionGroup(
name="GroupWithInput",
actions=[
Action(name="action1", action=lambda: 1),
DummyInputAction(name="dummy"),
],
)
cmd = Command(
key="B",
description="Group with input",
action=group,
)
assert cmd.requires_input is True
assert cmd.hidden is True
def test_enable_retry(): def test_enable_retry():
"""Command should enable retry if action is an Action and retry is set to True.""" """Command should enable retry if action is an Action and retry is set to True."""
cmd = Command( cmd = Command(

View File

@ -5,98 +5,109 @@ from falyx.parsers import ArgumentAction, CommandArgumentParser
from falyx.signals import HelpSignal from falyx.signals import HelpSignal
def build_parser_and_parse(args, config): async def build_parser_and_parse(args, config):
cap = CommandArgumentParser() cap = CommandArgumentParser()
config(cap) config(cap)
return cap.parse_args(args) return await cap.parse_args(args)
def test_none(): @pytest.mark.asyncio
async def test_none():
def config(parser): def config(parser):
parser.add_argument("--foo", type=str) parser.add_argument("--foo", type=str)
parsed = build_parser_and_parse(None, config) parsed = await build_parser_and_parse(None, config)
assert parsed["foo"] is None assert parsed["foo"] is None
def test_append_multiple_flags(): @pytest.mark.asyncio
async def test_append_multiple_flags():
def config(parser): def config(parser):
parser.add_argument("--tag", action=ArgumentAction.APPEND, type=str) parser.add_argument("--tag", action=ArgumentAction.APPEND, type=str)
parsed = build_parser_and_parse(["--tag", "a", "--tag", "b", "--tag", "c"], config) parsed = await build_parser_and_parse(
["--tag", "a", "--tag", "b", "--tag", "c"], config
)
assert parsed["tag"] == ["a", "b", "c"] assert parsed["tag"] == ["a", "b", "c"]
def test_positional_nargs_plus_and_single(): @pytest.mark.asyncio
async def test_positional_nargs_plus_and_single():
def config(parser): def config(parser):
parser.add_argument("files", nargs="+", type=str) parser.add_argument("files", nargs="+", type=str)
parser.add_argument("mode", nargs=1) parser.add_argument("mode", nargs=1)
parsed = build_parser_and_parse(["a", "b", "c", "prod"], config) parsed = await build_parser_and_parse(["a", "b", "c", "prod"], config)
assert parsed["files"] == ["a", "b", "c"] assert parsed["files"] == ["a", "b", "c"]
assert parsed["mode"] == "prod" assert parsed["mode"] == "prod"
def test_type_validation_failure(): @pytest.mark.asyncio
async def test_type_validation_failure():
def config(parser): def config(parser):
parser.add_argument("--count", type=int) parser.add_argument("--count", type=int)
with pytest.raises(CommandArgumentError): with pytest.raises(CommandArgumentError):
build_parser_and_parse(["--count", "abc"], config) await build_parser_and_parse(["--count", "abc"], config)
def test_required_field_missing(): @pytest.mark.asyncio
async def test_required_field_missing():
def config(parser): def config(parser):
parser.add_argument("--env", type=str, required=True) parser.add_argument("--env", type=str, required=True)
with pytest.raises(CommandArgumentError): with pytest.raises(CommandArgumentError):
build_parser_and_parse([], config) await build_parser_and_parse([], config)
def test_choices_enforced(): @pytest.mark.asyncio
async def test_choices_enforced():
def config(parser): def config(parser):
parser.add_argument("--mode", choices=["dev", "prod"]) parser.add_argument("--mode", choices=["dev", "prod"])
with pytest.raises(CommandArgumentError): with pytest.raises(CommandArgumentError):
build_parser_and_parse(["--mode", "staging"], config) await build_parser_and_parse(["--mode", "staging"], config)
def test_boolean_flags(): @pytest.mark.asyncio
async def test_boolean_flags():
def config(parser): def config(parser):
parser.add_argument("--debug", action=ArgumentAction.STORE_TRUE) parser.add_argument("--debug", action=ArgumentAction.STORE_TRUE)
parser.add_argument("--no-debug", action=ArgumentAction.STORE_FALSE) parser.add_argument("--no-debug", action=ArgumentAction.STORE_FALSE)
parsed = build_parser_and_parse(["--debug", "--no-debug"], config) parsed = await build_parser_and_parse(["--debug", "--no-debug"], config)
assert parsed["debug"] is True assert parsed["debug"] is True
assert parsed["no_debug"] is False assert parsed["no_debug"] is False
parsed = build_parser_and_parse([], config) parsed = await build_parser_and_parse([], config)
print(parsed)
assert parsed["debug"] is False assert parsed["debug"] is False
assert parsed["no_debug"] is True assert parsed["no_debug"] is True
def test_count_action(): @pytest.mark.asyncio
async def test_count_action():
def config(parser): def config(parser):
parser.add_argument("-v", action=ArgumentAction.COUNT) parser.add_argument("-v", action=ArgumentAction.COUNT)
parsed = build_parser_and_parse(["-v", "-v", "-v"], config) parsed = await build_parser_and_parse(["-v", "-v", "-v"], config)
assert parsed["v"] == 3 assert parsed["v"] == 3
def test_nargs_star(): @pytest.mark.asyncio
async def test_nargs_star():
def config(parser): def config(parser):
parser.add_argument("args", nargs="*", type=str) parser.add_argument("args", nargs="*", type=str)
parsed = build_parser_and_parse(["one", "two", "three"], config) parsed = await build_parser_and_parse(["one", "two", "three"], config)
assert parsed["args"] == ["one", "two", "three"] assert parsed["args"] == ["one", "two", "three"]
def test_flag_and_positional_mix(): @pytest.mark.asyncio
async def test_flag_and_positional_mix():
def config(parser): def config(parser):
parser.add_argument("--env", type=str) parser.add_argument("--env", type=str)
parser.add_argument("tasks", nargs="+") parser.add_argument("tasks", nargs="+")
parsed = build_parser_and_parse(["--env", "prod", "build", "test"], config) parsed = await build_parser_and_parse(["--env", "prod", "build", "test"], config)
assert parsed["env"] == "prod" assert parsed["env"] == "prod"
assert parsed["tasks"] == ["build", "test"] assert parsed["tasks"] == ["build", "test"]
@ -134,7 +145,7 @@ def test_add_argument_multiple_optional_flags_same_dest():
parser.add_argument("-f", "--falyx") parser.add_argument("-f", "--falyx")
arg = parser._arguments[-1] arg = parser._arguments[-1]
assert arg.dest == "falyx" assert arg.dest == "falyx"
assert arg.flags == ["-f", "--falyx"] assert arg.flags == ("-f", "--falyx")
def test_add_argument_flag_dest_conflict(): def test_add_argument_flag_dest_conflict():
@ -165,7 +176,7 @@ def test_add_argument_multiple_flags_custom_dest():
parser.add_argument("-f", "--falyx", "--test", dest="falyx") parser.add_argument("-f", "--falyx", "--test", dest="falyx")
arg = parser._arguments[-1] arg = parser._arguments[-1]
assert arg.dest == "falyx" assert arg.dest == "falyx"
assert arg.flags == ["-f", "--falyx", "--test"] assert arg.flags == ("-f", "--falyx", "--test")
def test_add_argument_multiple_flags_dest(): def test_add_argument_multiple_flags_dest():
@ -175,7 +186,7 @@ def test_add_argument_multiple_flags_dest():
parser.add_argument("-f", "--falyx", "--test") parser.add_argument("-f", "--falyx", "--test")
arg = parser._arguments[-1] arg = parser._arguments[-1]
assert arg.dest == "falyx" assert arg.dest == "falyx"
assert arg.flags == ["-f", "--falyx", "--test"] assert arg.flags == ("-f", "--falyx", "--test")
def test_add_argument_single_flag_dest(): def test_add_argument_single_flag_dest():
@ -185,7 +196,7 @@ def test_add_argument_single_flag_dest():
parser.add_argument("-f") parser.add_argument("-f")
arg = parser._arguments[-1] arg = parser._arguments[-1]
assert arg.dest == "f" assert arg.dest == "f"
assert arg.flags == ["-f"] assert arg.flags == ("-f",)
def test_add_argument_bad_dest(): def test_add_argument_bad_dest():
@ -257,7 +268,7 @@ def test_add_argument_default_value():
parser.add_argument("--falyx", default="default_value") parser.add_argument("--falyx", default="default_value")
arg = parser._arguments[-1] arg = parser._arguments[-1]
assert arg.dest == "falyx" assert arg.dest == "falyx"
assert arg.flags == ["--falyx"] assert arg.flags == ("--falyx",)
assert arg.default == "default_value" assert arg.default == "default_value"
@ -297,20 +308,21 @@ def test_add_argument_default_not_in_choices():
parser.add_argument("--falyx", choices=["a", "b"], default="c") parser.add_argument("--falyx", choices=["a", "b"], default="c")
def test_add_argument_choices(): @pytest.mark.asyncio
async def test_add_argument_choices():
parser = CommandArgumentParser() parser = CommandArgumentParser()
# ✅ Choices provided # ✅ Choices provided
parser.add_argument("--falyx", choices=["a", "b", "c"]) parser.add_argument("--falyx", choices=["a", "b", "c"])
arg = parser._arguments[-1] arg = parser._arguments[-1]
assert arg.dest == "falyx" assert arg.dest == "falyx"
assert arg.flags == ["--falyx"] assert arg.flags == ("--falyx",)
assert arg.choices == ["a", "b", "c"] assert arg.choices == ["a", "b", "c"]
args = parser.parse_args(["--falyx", "a"]) args = await parser.parse_args(["--falyx", "a"])
assert args["falyx"] == "a" assert args["falyx"] == "a"
with pytest.raises(CommandArgumentError): with pytest.raises(CommandArgumentError):
parser.parse_args(["--falyx", "d"]) await parser.parse_args(["--falyx", "d"])
def test_add_argument_choices_invalid(): def test_add_argument_choices_invalid():
@ -352,7 +364,7 @@ def test_add_argument_nargs():
parser.add_argument("--falyx", nargs=2) parser.add_argument("--falyx", nargs=2)
arg = parser._arguments[-1] arg = parser._arguments[-1]
assert arg.dest == "falyx" assert arg.dest == "falyx"
assert arg.flags == ["--falyx"] assert arg.flags == ("--falyx",)
assert arg.nargs == 2 assert arg.nargs == 2
@ -377,56 +389,60 @@ def test_get_argument():
parser.add_argument("--falyx", type=str, default="default_value") parser.add_argument("--falyx", type=str, default="default_value")
arg = parser.get_argument("falyx") arg = parser.get_argument("falyx")
assert arg.dest == "falyx" assert arg.dest == "falyx"
assert arg.flags == ["--falyx"] assert arg.flags == ("--falyx",)
assert arg.default == "default_value" assert arg.default == "default_value"
def test_parse_args_nargs(): @pytest.mark.asyncio
async def test_parse_args_nargs():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("files", nargs="+", type=str) parser.add_argument("files", nargs="+", type=str)
parser.add_argument("mode", nargs=1) parser.add_argument("mode", nargs=1)
args = parser.parse_args(["a", "b", "c"]) args = await parser.parse_args(["a", "b", "c"])
assert args["files"] == ["a", "b"] assert args["files"] == ["a", "b"]
assert args["mode"] == "c" assert args["mode"] == "c"
def test_parse_args_nargs_plus(): @pytest.mark.asyncio
async def test_parse_args_nargs_plus():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("files", nargs="+", type=str) parser.add_argument("files", nargs="+", type=str)
args = parser.parse_args(["a", "b", "c"]) args = await parser.parse_args(["a", "b", "c"])
assert args["files"] == ["a", "b", "c"] assert args["files"] == ["a", "b", "c"]
args = parser.parse_args(["a"]) args = await parser.parse_args(["a"])
assert args["files"] == ["a"] assert args["files"] == ["a"]
def test_parse_args_flagged_nargs_plus(): @pytest.mark.asyncio
async def test_parse_args_flagged_nargs_plus():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("--files", nargs="+", type=str) parser.add_argument("--files", nargs="+", type=str)
args = parser.parse_args(["--files", "a", "b", "c"]) args = await parser.parse_args(["--files", "a", "b", "c"])
assert args["files"] == ["a", "b", "c"] assert args["files"] == ["a", "b", "c"]
args = parser.parse_args(["--files", "a"]) args = await parser.parse_args(["--files", "a"])
print(args) print(args)
assert args["files"] == ["a"] assert args["files"] == ["a"]
args = parser.parse_args([]) args = await parser.parse_args([])
assert args["files"] == [] assert args["files"] == []
def test_parse_args_numbered_nargs(): @pytest.mark.asyncio
async def test_parse_args_numbered_nargs():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("files", nargs=2, type=str) parser.add_argument("files", nargs=2, type=str)
args = parser.parse_args(["a", "b"]) args = await parser.parse_args(["a", "b"])
assert args["files"] == ["a", "b"] assert args["files"] == ["a", "b"]
with pytest.raises(CommandArgumentError): with pytest.raises(CommandArgumentError):
args = parser.parse_args(["a"]) args = await parser.parse_args(["a"])
print(args) print(args)
@ -436,48 +452,53 @@ def test_parse_args_nargs_zero():
parser.add_argument("files", nargs=0, type=str) parser.add_argument("files", nargs=0, type=str)
def test_parse_args_nargs_more_than_expected(): @pytest.mark.asyncio
async def test_parse_args_nargs_more_than_expected():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("files", nargs=2, type=str) parser.add_argument("files", nargs=2, type=str)
with pytest.raises(CommandArgumentError): with pytest.raises(CommandArgumentError):
parser.parse_args(["a", "b", "c", "d"]) await parser.parse_args(["a", "b", "c", "d"])
def test_parse_args_nargs_one_or_none(): @pytest.mark.asyncio
async def test_parse_args_nargs_one_or_none():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("files", nargs="?", type=str) parser.add_argument("files", nargs="?", type=str)
args = parser.parse_args(["a"]) args = await parser.parse_args(["a"])
assert args["files"] == "a" assert args["files"] == "a"
args = parser.parse_args([]) args = await parser.parse_args([])
assert args["files"] is None assert args["files"] is None
def test_parse_args_nargs_positional(): @pytest.mark.asyncio
async def test_parse_args_nargs_positional():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("files", nargs="*", type=str) parser.add_argument("files", nargs="*", type=str)
args = parser.parse_args(["a", "b", "c"]) args = await parser.parse_args(["a", "b", "c"])
assert args["files"] == ["a", "b", "c"] assert args["files"] == ["a", "b", "c"]
args = parser.parse_args([]) args = await parser.parse_args([])
assert args["files"] == [] assert args["files"] == []
def test_parse_args_nargs_positional_plus(): @pytest.mark.asyncio
async def test_parse_args_nargs_positional_plus():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("files", nargs="+", type=str) parser.add_argument("files", nargs="+", type=str)
args = parser.parse_args(["a", "b", "c"]) args = await parser.parse_args(["a", "b", "c"])
assert args["files"] == ["a", "b", "c"] assert args["files"] == ["a", "b", "c"]
with pytest.raises(CommandArgumentError): with pytest.raises(CommandArgumentError):
args = parser.parse_args([]) args = await parser.parse_args([])
def test_parse_args_nargs_multiple_positional(): @pytest.mark.asyncio
async def test_parse_args_nargs_multiple_positional():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("files", nargs="+", type=str) parser.add_argument("files", nargs="+", type=str)
parser.add_argument("mode", nargs=1) parser.add_argument("mode", nargs=1)
@ -485,7 +506,7 @@ def test_parse_args_nargs_multiple_positional():
parser.add_argument("target", nargs="*") parser.add_argument("target", nargs="*")
parser.add_argument("extra", nargs="+") parser.add_argument("extra", nargs="+")
args = parser.parse_args(["a", "b", "c", "d", "e"]) args = await parser.parse_args(["a", "b", "c", "d", "e"])
assert args["files"] == ["a", "b", "c"] assert args["files"] == ["a", "b", "c"]
assert args["mode"] == "d" assert args["mode"] == "d"
assert args["action"] == [] assert args["action"] == []
@ -493,186 +514,209 @@ def test_parse_args_nargs_multiple_positional():
assert args["extra"] == ["e"] assert args["extra"] == ["e"]
with pytest.raises(CommandArgumentError): with pytest.raises(CommandArgumentError):
parser.parse_args([]) await parser.parse_args([])
def test_parse_args_nargs_invalid_positional_arguments(): @pytest.mark.asyncio
async def test_parse_args_nargs_invalid_positional_arguments():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("numbers", nargs="*", type=int) parser.add_argument("numbers", nargs="*", type=int)
parser.add_argument("mode", nargs=1) parser.add_argument("mode", nargs=1)
with pytest.raises(CommandArgumentError): with pytest.raises(CommandArgumentError):
parser.parse_args(["1", "2", "c", "d"]) await parser.parse_args(["1", "2", "c", "d"])
def test_parse_args_append(): @pytest.mark.asyncio
async def test_parse_args_append():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("--numbers", action=ArgumentAction.APPEND, type=int) parser.add_argument("--numbers", action=ArgumentAction.APPEND, type=int)
args = parser.parse_args(["--numbers", "1", "--numbers", "2", "--numbers", "3"]) args = await parser.parse_args(["--numbers", "1", "--numbers", "2", "--numbers", "3"])
assert args["numbers"] == [1, 2, 3] assert args["numbers"] == [1, 2, 3]
args = parser.parse_args(["--numbers", "1"]) args = await parser.parse_args(["--numbers", "1"])
assert args["numbers"] == [1] assert args["numbers"] == [1]
args = parser.parse_args([]) args = await parser.parse_args([])
assert args["numbers"] == [] assert args["numbers"] == []
def test_parse_args_nargs_append(): @pytest.mark.asyncio
async def test_parse_args_nargs_append():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("numbers", action=ArgumentAction.APPEND, type=int, nargs="*") parser.add_argument("numbers", action=ArgumentAction.APPEND, type=int, nargs="*")
parser.add_argument("--mode") parser.add_argument("--mode")
args = parser.parse_args(["1", "2", "3", "--mode", "numbers", "4", "5"]) args = await parser.parse_args(["1", "2", "3", "--mode", "numbers", "4", "5"])
assert args["numbers"] == [[1, 2, 3], [4, 5]] assert args["numbers"] == [[1, 2, 3], [4, 5]]
args = parser.parse_args(["1"]) args = await parser.parse_args(["1"])
assert args["numbers"] == [[1]] assert args["numbers"] == [[1]]
args = parser.parse_args([]) args = await parser.parse_args([])
assert args["numbers"] == [] assert args["numbers"] == []
def test_parse_args_append_flagged_invalid_type(): @pytest.mark.asyncio
async def test_parse_args_append_flagged_invalid_type():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("--numbers", action=ArgumentAction.APPEND, type=int) parser.add_argument("--numbers", action=ArgumentAction.APPEND, type=int)
with pytest.raises(CommandArgumentError): with pytest.raises(CommandArgumentError):
parser.parse_args(["--numbers", "a"]) await parser.parse_args(["--numbers", "a"])
def test_append_groups_nargs(): @pytest.mark.asyncio
async def test_append_groups_nargs():
cap = CommandArgumentParser() cap = CommandArgumentParser()
cap.add_argument("--item", action=ArgumentAction.APPEND, type=str, nargs=2) cap.add_argument("--item", action=ArgumentAction.APPEND, type=str, nargs=2)
parsed = cap.parse_args(["--item", "a", "b", "--item", "c", "d"]) parsed = await cap.parse_args(["--item", "a", "b", "--item", "c", "d"])
assert parsed["item"] == [["a", "b"], ["c", "d"]] assert parsed["item"] == [["a", "b"], ["c", "d"]]
def test_extend_flattened(): @pytest.mark.asyncio
async def test_extend_flattened():
cap = CommandArgumentParser() cap = CommandArgumentParser()
cap.add_argument("--value", action=ArgumentAction.EXTEND, type=str) cap.add_argument("--value", action=ArgumentAction.EXTEND, type=str)
parsed = cap.parse_args(["--value", "x", "--value", "y"]) parsed = await cap.parse_args(["--value", "x", "--value", "y"])
assert parsed["value"] == ["x", "y"] assert parsed["value"] == ["x", "y"]
def test_parse_args_split_order(): @pytest.mark.asyncio
async def test_parse_args_split_order():
cap = CommandArgumentParser() cap = CommandArgumentParser()
cap.add_argument("a") cap.add_argument("a")
cap.add_argument("--x") cap.add_argument("--x")
cap.add_argument("b", nargs="*") cap.add_argument("b", nargs="*")
args, kwargs = cap.parse_args_split(["1", "--x", "100", "2"]) args, kwargs = await cap.parse_args_split(["1", "--x", "100", "2"])
assert args == ("1", ["2"]) assert args == ("1", ["2"])
assert kwargs == {"x": "100"} assert kwargs == {"x": "100"}
def test_help_signal_triggers(): @pytest.mark.asyncio
async def test_help_signal_triggers():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("--foo") parser.add_argument("--foo")
with pytest.raises(HelpSignal): with pytest.raises(HelpSignal):
parser.parse_args(["--help"]) await parser.parse_args(["--help"])
def test_empty_parser_defaults(): @pytest.mark.asyncio
async def test_empty_parser_defaults():
parser = CommandArgumentParser() parser = CommandArgumentParser()
with pytest.raises(HelpSignal): with pytest.raises(HelpSignal):
parser.parse_args(["--help"]) await parser.parse_args(["--help"])
def test_extend_basic(): @pytest.mark.asyncio
async def test_extend_basic():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("--tag", action=ArgumentAction.EXTEND, type=str) parser.add_argument("--tag", action=ArgumentAction.EXTEND, type=str)
args = parser.parse_args(["--tag", "a", "--tag", "b", "--tag", "c"]) args = await parser.parse_args(["--tag", "a", "--tag", "b", "--tag", "c"])
assert args["tag"] == ["a", "b", "c"] assert args["tag"] == ["a", "b", "c"]
def test_extend_nargs_2(): @pytest.mark.asyncio
async def test_extend_nargs_2():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("--pair", action=ArgumentAction.EXTEND, type=str, nargs=2) parser.add_argument("--pair", action=ArgumentAction.EXTEND, type=str, nargs=2)
args = parser.parse_args(["--pair", "a", "b", "--pair", "c", "d"]) args = await parser.parse_args(["--pair", "a", "b", "--pair", "c", "d"])
assert args["pair"] == ["a", "b", "c", "d"] assert args["pair"] == ["a", "b", "c", "d"]
def test_extend_nargs_star(): @pytest.mark.asyncio
async def test_extend_nargs_star():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("--files", action=ArgumentAction.EXTEND, type=str, nargs="*") parser.add_argument("--files", action=ArgumentAction.EXTEND, type=str, nargs="*")
args = parser.parse_args(["--files", "x", "y", "z"]) args = await parser.parse_args(["--files", "x", "y", "z"])
assert args["files"] == ["x", "y", "z"] assert args["files"] == ["x", "y", "z"]
args = parser.parse_args(["--files"]) args = await parser.parse_args(["--files"])
assert args["files"] == [] assert args["files"] == []
def test_extend_nargs_plus(): @pytest.mark.asyncio
async def test_extend_nargs_plus():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("--inputs", action=ArgumentAction.EXTEND, type=int, nargs="+") parser.add_argument("--inputs", action=ArgumentAction.EXTEND, type=int, nargs="+")
args = parser.parse_args(["--inputs", "1", "2", "3", "--inputs", "4"]) args = await parser.parse_args(["--inputs", "1", "2", "3", "--inputs", "4"])
assert args["inputs"] == [1, 2, 3, 4] assert args["inputs"] == [1, 2, 3, 4]
def test_extend_invalid_type(): @pytest.mark.asyncio
async def test_extend_invalid_type():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("--nums", action=ArgumentAction.EXTEND, type=int) parser.add_argument("--nums", action=ArgumentAction.EXTEND, type=int)
with pytest.raises(CommandArgumentError): with pytest.raises(CommandArgumentError):
parser.parse_args(["--nums", "a"]) await parser.parse_args(["--nums", "a"])
def test_greedy_invalid_type(): @pytest.mark.asyncio
async def test_greedy_invalid_type():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("--nums", nargs="*", type=int) parser.add_argument("--nums", nargs="*", type=int)
with pytest.raises(CommandArgumentError): with pytest.raises(CommandArgumentError):
parser.parse_args(["--nums", "a"]) await parser.parse_args(["--nums", "a"])
def test_append_vs_extend_behavior(): @pytest.mark.asyncio
async def test_append_vs_extend_behavior():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("--x", action=ArgumentAction.APPEND, nargs=2) parser.add_argument("--x", action=ArgumentAction.APPEND, nargs=2)
parser.add_argument("--y", action=ArgumentAction.EXTEND, nargs=2) parser.add_argument("--y", action=ArgumentAction.EXTEND, nargs=2)
args = parser.parse_args( args = await parser.parse_args(
["--x", "a", "b", "--x", "c", "d", "--y", "1", "2", "--y", "3", "4"] ["--x", "a", "b", "--x", "c", "d", "--y", "1", "2", "--y", "3", "4"]
) )
assert args["x"] == [["a", "b"], ["c", "d"]] assert args["x"] == [["a", "b"], ["c", "d"]]
assert args["y"] == ["1", "2", "3", "4"] assert args["y"] == ["1", "2", "3", "4"]
def test_append_vs_extend_behavior_error(): @pytest.mark.asyncio
async def test_append_vs_extend_behavior_error():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("--x", action=ArgumentAction.APPEND, nargs=2) parser.add_argument("--x", action=ArgumentAction.APPEND, nargs=2)
parser.add_argument("--y", action=ArgumentAction.EXTEND, nargs=2) parser.add_argument("--y", action=ArgumentAction.EXTEND, nargs=2)
# This should raise an error because the last argument is not a valid pair # This should raise an error because the last argument is not a valid pair
with pytest.raises(CommandArgumentError): with pytest.raises(CommandArgumentError):
parser.parse_args(["--x", "a", "b", "--x", "c", "d", "--y", "1", "2", "--y", "3"]) await parser.parse_args(
["--x", "a", "b", "--x", "c", "d", "--y", "1", "2", "--y", "3"]
)
with pytest.raises(CommandArgumentError): with pytest.raises(CommandArgumentError):
parser.parse_args(["--x", "a", "b", "--x", "c", "--y", "1", "--y", "3", "4"]) await parser.parse_args(
["--x", "a", "b", "--x", "c", "--y", "1", "--y", "3", "4"]
)
def test_extend_positional(): @pytest.mark.asyncio
async def test_extend_positional():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("files", action=ArgumentAction.EXTEND, type=str, nargs="*") parser.add_argument("files", action=ArgumentAction.EXTEND, type=str, nargs="*")
args = parser.parse_args(["a", "b", "c"]) args = await parser.parse_args(["a", "b", "c"])
assert args["files"] == ["a", "b", "c"] assert args["files"] == ["a", "b", "c"]
args = parser.parse_args([]) args = await parser.parse_args([])
assert args["files"] == [] assert args["files"] == []
def test_extend_positional_nargs(): @pytest.mark.asyncio
async def test_extend_positional_nargs():
parser = CommandArgumentParser() parser = CommandArgumentParser()
parser.add_argument("files", action=ArgumentAction.EXTEND, type=str, nargs="+") parser.add_argument("files", action=ArgumentAction.EXTEND, type=str, nargs="+")
args = parser.parse_args(["a", "b", "c"]) args = await parser.parse_args(["a", "b", "c"])
assert args["files"] == ["a", "b", "c"] assert args["files"] == ["a", "b", "c"]
with pytest.raises(CommandArgumentError): with pytest.raises(CommandArgumentError):
parser.parse_args([]) await parser.parse_args([])