Compare commits
7 Commits
argparse-i
...
ba562168aa
Author | SHA1 | Date | |
---|---|---|---|
ba562168aa
|
|||
ddb78bd5a7
|
|||
b0c0e7dc16
|
|||
0a1ba22a3d
|
|||
b51ba87999
|
|||
3c0a81359c
|
|||
4fa6e3bf1f |
@ -6,7 +6,7 @@ from falyx.action import ActionFactoryAction, ChainedAction, HTTPAction, Selecti
|
||||
# Selection of a post ID to fetch (just an example set)
|
||||
post_selector = SelectionAction(
|
||||
name="Pick Post ID",
|
||||
selections=["1", "2", "3", "4", "5"],
|
||||
selections=["15", "25", "35", "45", "55"],
|
||||
title="Choose a Post ID to submit",
|
||||
prompt_message="Post ID > ",
|
||||
show_table=True,
|
||||
@ -14,7 +14,7 @@ post_selector = SelectionAction(
|
||||
|
||||
|
||||
# Factory that builds and executes the actual HTTP POST request
|
||||
def build_post_action(post_id) -> HTTPAction:
|
||||
async def build_post_action(post_id) -> HTTPAction:
|
||||
print(f"Building HTTPAction for Post ID: {post_id}")
|
||||
return HTTPAction(
|
||||
name=f"POST to /posts (id={post_id})",
|
||||
|
@ -24,7 +24,6 @@ cmd = Command(
|
||||
key="G",
|
||||
description="Greet someone with multiple variations.",
|
||||
action=group,
|
||||
auto_args=True,
|
||||
arg_metadata={
|
||||
"name": {
|
||||
"help": "The name of the person to greet.",
|
||||
|
@ -1,14 +1,18 @@
|
||||
import asyncio
|
||||
|
||||
from falyx import Action, Falyx
|
||||
from falyx import Action, ChainedAction, Falyx
|
||||
from falyx.utils import setup_logging
|
||||
|
||||
setup_logging()
|
||||
|
||||
|
||||
async def deploy(service: str, region: str = "us-east-1", verbose: bool = False):
|
||||
async def deploy(service: str, region: str = "us-east-1", verbose: bool = False) -> str:
|
||||
if verbose:
|
||||
print(f"Deploying {service} to {region}...")
|
||||
await asyncio.sleep(2)
|
||||
if verbose:
|
||||
print(f"{service} deployed successfully!")
|
||||
return f"{service} deployed to {region}"
|
||||
|
||||
|
||||
flx = Falyx("Deployment CLI")
|
||||
@ -21,7 +25,6 @@ flx.add_command(
|
||||
name="deploy_service",
|
||||
action=deploy,
|
||||
),
|
||||
auto_args=True,
|
||||
arg_metadata={
|
||||
"service": "Service name",
|
||||
"region": {"help": "Deployment region", "choices": ["us-east-1", "us-west-2"]},
|
||||
@ -29,4 +32,23 @@ flx.add_command(
|
||||
},
|
||||
)
|
||||
|
||||
deploy_chain = ChainedAction(
|
||||
name="DeployChain",
|
||||
actions=[
|
||||
Action(name="deploy_service", action=deploy),
|
||||
Action(
|
||||
name="notify",
|
||||
action=lambda last_result: print(f"Notification: {last_result}"),
|
||||
),
|
||||
],
|
||||
auto_inject=True,
|
||||
)
|
||||
|
||||
flx.add_command(
|
||||
key="N",
|
||||
aliases=["notify"],
|
||||
description="Deploy a service and notify.",
|
||||
action=deploy_chain,
|
||||
)
|
||||
|
||||
asyncio.run(flx.run())
|
||||
|
@ -4,7 +4,6 @@ from rich.console import Console
|
||||
|
||||
from falyx import ActionGroup, Falyx
|
||||
from falyx.action import HTTPAction
|
||||
from falyx.hook_manager import HookType
|
||||
from falyx.hooks import ResultReporter
|
||||
|
||||
console = Console()
|
||||
@ -49,7 +48,7 @@ action_group = ActionGroup(
|
||||
reporter = ResultReporter()
|
||||
|
||||
action_group.hooks.register(
|
||||
HookType.ON_SUCCESS,
|
||||
"on_success",
|
||||
reporter.report,
|
||||
)
|
||||
|
||||
|
@ -2,8 +2,16 @@ import asyncio
|
||||
import time
|
||||
|
||||
from falyx import Falyx
|
||||
from falyx.action import Action, ActionGroup, ChainedAction, MenuAction, ProcessAction
|
||||
from falyx.action import (
|
||||
Action,
|
||||
ActionGroup,
|
||||
ChainedAction,
|
||||
MenuAction,
|
||||
ProcessAction,
|
||||
PromptMenuAction,
|
||||
)
|
||||
from falyx.menu import MenuOption, MenuOptionMap
|
||||
from falyx.themes import OneColors
|
||||
|
||||
|
||||
# Basic coroutine for Action
|
||||
@ -77,20 +85,28 @@ parallel = ActionGroup(
|
||||
|
||||
process = ProcessAction(name="compute", action=heavy_computation)
|
||||
|
||||
menu_options = MenuOptionMap(
|
||||
{
|
||||
"A": MenuOption("Run basic Action", basic_action, style=OneColors.LIGHT_YELLOW),
|
||||
"C": MenuOption("Run ChainedAction", chained, style=OneColors.MAGENTA),
|
||||
"P": MenuOption("Run ActionGroup (parallel)", parallel, style=OneColors.CYAN),
|
||||
"H": MenuOption("Run ProcessAction (heavy task)", process, style=OneColors.GREEN),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
# Menu setup
|
||||
|
||||
menu = MenuAction(
|
||||
name="main-menu",
|
||||
title="Choose a task to run",
|
||||
menu_options=MenuOptionMap(
|
||||
{
|
||||
"1": MenuOption("Run basic Action", basic_action),
|
||||
"2": MenuOption("Run ChainedAction", chained),
|
||||
"3": MenuOption("Run ActionGroup (parallel)", parallel),
|
||||
"4": MenuOption("Run ProcessAction (heavy task)", process),
|
||||
}
|
||||
),
|
||||
menu_options=menu_options,
|
||||
)
|
||||
|
||||
|
||||
prompt_menu = PromptMenuAction(
|
||||
name="select-user",
|
||||
menu_options=menu_options,
|
||||
)
|
||||
|
||||
flx = Falyx(
|
||||
@ -108,6 +124,13 @@ flx.add_command(
|
||||
logging_hooks=True,
|
||||
)
|
||||
|
||||
flx.add_command(
|
||||
key="P",
|
||||
description="Show Prompt Menu",
|
||||
action=prompt_menu,
|
||||
logging_hooks=True,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(flx.run())
|
||||
|
@ -3,7 +3,6 @@ import asyncio
|
||||
from falyx import Action, ActionGroup, ChainedAction
|
||||
from falyx import ExecutionRegistry as er
|
||||
from falyx import ProcessAction
|
||||
from falyx.hook_manager import HookType
|
||||
from falyx.retry import RetryHandler, RetryPolicy
|
||||
|
||||
|
||||
@ -47,7 +46,7 @@ def build_pipeline():
|
||||
checkout = Action("Checkout", checkout_code)
|
||||
analysis = ProcessAction("Static Analysis", run_static_analysis)
|
||||
tests = Action("Run Tests", flaky_tests)
|
||||
tests.hooks.register(HookType.ON_ERROR, retry_handler.retry_on_error)
|
||||
tests.hooks.register("on_error", retry_handler.retry_on_error)
|
||||
|
||||
# Parallel deploys
|
||||
deploy_group = ActionGroup(
|
||||
|
@ -1,22 +1,30 @@
|
||||
import asyncio
|
||||
|
||||
from falyx.selection import (
|
||||
SelectionOption,
|
||||
prompt_for_selection,
|
||||
render_selection_dict_table,
|
||||
)
|
||||
from falyx.action import SelectionAction
|
||||
from falyx.selection import SelectionOption
|
||||
from falyx.signals import CancelSignal
|
||||
|
||||
menu = {
|
||||
"A": SelectionOption("Run diagnostics", lambda: print("Running diagnostics...")),
|
||||
"B": SelectionOption("Deploy to staging", lambda: print("Deploying...")),
|
||||
selections = {
|
||||
"1": SelectionOption(
|
||||
description="Production", value="3bc2616e-3696-11f0-a139-089204eb86ac"
|
||||
),
|
||||
"2": SelectionOption(
|
||||
description="Staging", value="42f2cd84-3696-11f0-a139-089204eb86ac"
|
||||
),
|
||||
}
|
||||
|
||||
table = render_selection_dict_table(
|
||||
title="Main Menu",
|
||||
selections=menu,
|
||||
|
||||
select = SelectionAction(
|
||||
name="Select Deployment",
|
||||
selections=selections,
|
||||
title="Select a Deployment",
|
||||
columns=2,
|
||||
prompt_message="> ",
|
||||
return_type="value",
|
||||
show_table=True,
|
||||
)
|
||||
|
||||
key = asyncio.run(prompt_for_selection(menu.keys(), table))
|
||||
print(f"You selected: {key}")
|
||||
|
||||
menu[key.upper()].value()
|
||||
try:
|
||||
print(asyncio.run(select()))
|
||||
except CancelSignal:
|
||||
print("Selection was cancelled.")
|
||||
|
@ -3,7 +3,6 @@ import asyncio
|
||||
|
||||
from falyx import Action, ChainedAction, Falyx
|
||||
from falyx.action import ShellAction
|
||||
from falyx.hook_manager import HookType
|
||||
from falyx.hooks import ResultReporter
|
||||
from falyx.utils import setup_logging
|
||||
|
||||
@ -42,12 +41,12 @@ reporter = ResultReporter()
|
||||
|
||||
a1 = Action("a1", a1, inject_last_result=True)
|
||||
a1.hooks.register(
|
||||
HookType.ON_SUCCESS,
|
||||
"on_success",
|
||||
reporter.report,
|
||||
)
|
||||
a2 = Action("a2", a2, inject_last_result=True)
|
||||
a2.hooks.register(
|
||||
HookType.ON_SUCCESS,
|
||||
"on_success",
|
||||
reporter.report,
|
||||
)
|
||||
|
||||
|
@ -12,7 +12,6 @@ from .command import Command
|
||||
from .context import ExecutionContext, SharedContext
|
||||
from .execution_registry import ExecutionRegistry
|
||||
from .falyx import Falyx
|
||||
from .hook_manager import HookType
|
||||
|
||||
logger = logging.getLogger("falyx")
|
||||
|
||||
|
@ -18,6 +18,7 @@ from .action_factory import ActionFactoryAction
|
||||
from .http_action import HTTPAction
|
||||
from .io_action import BaseIOAction, ShellAction
|
||||
from .menu_action import MenuAction
|
||||
from .prompt_menu_action import PromptMenuAction
|
||||
from .select_file_action import SelectFileAction
|
||||
from .selection_action import SelectionAction
|
||||
from .signal_action import SignalAction
|
||||
@ -40,4 +41,5 @@ __all__ = [
|
||||
"FallbackAction",
|
||||
"LiteralInputAction",
|
||||
"UserInputAction",
|
||||
"PromptMenuAction",
|
||||
]
|
||||
|
@ -47,6 +47,7 @@ from falyx.execution_registry import ExecutionRegistry as er
|
||||
from falyx.hook_manager import Hook, HookManager, HookType
|
||||
from falyx.logger import logger
|
||||
from falyx.options_manager import OptionsManager
|
||||
from falyx.parsers.utils import same_argument_definitions
|
||||
from falyx.retry import RetryHandler, RetryPolicy
|
||||
from falyx.themes import OneColors
|
||||
from falyx.utils import ensure_async
|
||||
@ -61,8 +62,7 @@ class BaseAction(ABC):
|
||||
inject_last_result (bool): Whether to inject the previous action's result
|
||||
into kwargs.
|
||||
inject_into (str): The name of the kwarg key to inject the result as
|
||||
(default: 'last_result').
|
||||
_requires_injection (bool): Whether the action requires input injection.
|
||||
(default: 'last_result').
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@ -82,7 +82,6 @@ class BaseAction(ABC):
|
||||
self.inject_last_result: bool = inject_last_result
|
||||
self.inject_into: str = inject_into
|
||||
self._never_prompt: bool = never_prompt
|
||||
self._requires_injection: bool = False
|
||||
self._skip_in_chain: bool = False
|
||||
self.console = Console(color_system="auto")
|
||||
self.options_manager: OptionsManager | None = None
|
||||
@ -101,6 +100,14 @@ class BaseAction(ABC):
|
||||
async def preview(self, parent: Tree | None = None):
|
||||
raise NotImplementedError("preview must be implemented by subclasses")
|
||||
|
||||
@abstractmethod
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
|
||||
"""
|
||||
Returns the callable to be used for argument inference.
|
||||
By default, it returns None.
|
||||
"""
|
||||
raise NotImplementedError("get_infer_target must be implemented by subclasses")
|
||||
|
||||
def set_options_manager(self, options_manager: OptionsManager) -> None:
|
||||
self.options_manager = options_manager
|
||||
|
||||
@ -154,10 +161,6 @@ class BaseAction(ABC):
|
||||
async def _write_stdout(self, data: str) -> None:
|
||||
"""Override in subclasses that produce terminal output."""
|
||||
|
||||
def requires_io_injection(self) -> bool:
|
||||
"""Checks to see if the action requires input injection."""
|
||||
return self._requires_injection
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return str(self)
|
||||
|
||||
@ -246,6 +249,13 @@ class Action(BaseAction):
|
||||
if policy.enabled:
|
||||
self.enable_retry()
|
||||
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any], None]:
|
||||
"""
|
||||
Returns the callable to be used for argument inference.
|
||||
By default, it returns the action itself.
|
||||
"""
|
||||
return self.action, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> Any:
|
||||
combined_args = args + self.args
|
||||
combined_kwargs = self._maybe_inject_last_result({**self.kwargs, **kwargs})
|
||||
@ -477,6 +487,14 @@ class ChainedAction(BaseAction, ActionListMixin):
|
||||
if hasattr(action, "register_teardown") and callable(action.register_teardown):
|
||||
action.register_teardown(self.hooks)
|
||||
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
|
||||
if self.actions:
|
||||
return self.actions[0].get_infer_target()
|
||||
return None, None
|
||||
|
||||
def _clear_args(self):
|
||||
return (), {}
|
||||
|
||||
async def _run(self, *args, **kwargs) -> list[Any]:
|
||||
if not self.actions:
|
||||
raise EmptyChainError(f"[{self.name}] No actions to execute.")
|
||||
@ -505,12 +523,8 @@ class ChainedAction(BaseAction, ActionListMixin):
|
||||
continue
|
||||
shared_context.current_index = index
|
||||
prepared = action.prepare(shared_context, self.options_manager)
|
||||
last_result = shared_context.last_result()
|
||||
try:
|
||||
if self.requires_io_injection() and last_result is not None:
|
||||
result = await prepared(**{prepared.inject_into: last_result})
|
||||
else:
|
||||
result = await prepared(*args, **updated_kwargs)
|
||||
result = await prepared(*args, **updated_kwargs)
|
||||
except Exception as error:
|
||||
if index + 1 < len(self.actions) and isinstance(
|
||||
self.actions[index + 1], FallbackAction
|
||||
@ -529,6 +543,7 @@ class ChainedAction(BaseAction, ActionListMixin):
|
||||
fallback._skip_in_chain = True
|
||||
else:
|
||||
raise
|
||||
args, updated_kwargs = self._clear_args()
|
||||
shared_context.add_result(result)
|
||||
context.extra["results"].append(result)
|
||||
context.extra["rollback_stack"].append(prepared)
|
||||
@ -669,6 +684,16 @@ class ActionGroup(BaseAction, ActionListMixin):
|
||||
if hasattr(action, "register_teardown") and callable(action.register_teardown):
|
||||
action.register_teardown(self.hooks)
|
||||
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
|
||||
arg_defs = same_argument_definitions(self.actions)
|
||||
if arg_defs:
|
||||
return self.actions[0].get_infer_target()
|
||||
logger.debug(
|
||||
"[%s] auto_args disabled: mismatched ActionGroup arguments",
|
||||
self.name,
|
||||
)
|
||||
return None, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> list[tuple[str, Any]]:
|
||||
shared_context = SharedContext(name=self.name, action=self, is_parallel=True)
|
||||
if self.shared_context:
|
||||
@ -701,7 +726,7 @@ class ActionGroup(BaseAction, ActionListMixin):
|
||||
if context.extra["errors"]:
|
||||
context.exception = Exception(
|
||||
f"{len(context.extra['errors'])} action(s) failed: "
|
||||
f"{' ,'.join(name for name, _ in context.extra["errors"])}"
|
||||
f"{' ,'.join(name for name, _ in context.extra['errors'])}"
|
||||
)
|
||||
await self.hooks.trigger(HookType.ON_ERROR, context)
|
||||
raise context.exception
|
||||
@ -787,8 +812,11 @@ class ProcessAction(BaseAction):
|
||||
self.executor = executor or ProcessPoolExecutor()
|
||||
self.is_retryable = True
|
||||
|
||||
async def _run(self, *args, **kwargs):
|
||||
if self.inject_last_result:
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any] | None, None]:
|
||||
return self.action, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> Any:
|
||||
if self.inject_last_result and self.shared_context:
|
||||
last_result = self.shared_context.last_result()
|
||||
if not self._validate_pickleable(last_result):
|
||||
raise ValueError(
|
||||
|
@ -1,6 +1,6 @@
|
||||
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
|
||||
"""action_factory.py"""
|
||||
from typing import Any
|
||||
from typing import Any, Callable
|
||||
|
||||
from rich.tree import Tree
|
||||
|
||||
@ -35,6 +35,8 @@ class ActionFactoryAction(BaseAction):
|
||||
*,
|
||||
inject_last_result: bool = False,
|
||||
inject_into: str = "last_result",
|
||||
args: tuple[Any, ...] = (),
|
||||
kwargs: dict[str, Any] | None = None,
|
||||
preview_args: tuple[Any, ...] = (),
|
||||
preview_kwargs: dict[str, Any] | None = None,
|
||||
):
|
||||
@ -44,6 +46,8 @@ class ActionFactoryAction(BaseAction):
|
||||
inject_into=inject_into,
|
||||
)
|
||||
self.factory = factory
|
||||
self.args = args
|
||||
self.kwargs = kwargs or {}
|
||||
self.preview_args = preview_args
|
||||
self.preview_kwargs = preview_kwargs or {}
|
||||
|
||||
@ -55,7 +59,12 @@ class ActionFactoryAction(BaseAction):
|
||||
def factory(self, value: ActionFactoryProtocol):
|
||||
self._factory = ensure_async(value)
|
||||
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any], None]:
|
||||
return self.factory, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> Any:
|
||||
args = (*self.args, *args)
|
||||
kwargs = {**self.kwargs, **kwargs}
|
||||
updated_kwargs = self._maybe_inject_last_result(kwargs)
|
||||
context = ExecutionContext(
|
||||
name=f"{self.name} (factory)",
|
||||
@ -85,7 +94,7 @@ class ActionFactoryAction(BaseAction):
|
||||
)
|
||||
if self.options_manager:
|
||||
generated_action.set_options_manager(self.options_manager)
|
||||
context.result = await generated_action(*args, **kwargs)
|
||||
context.result = await generated_action()
|
||||
await self.hooks.trigger(HookType.ON_SUCCESS, context)
|
||||
return context.result
|
||||
except Exception as error:
|
||||
|
@ -19,7 +19,7 @@ import asyncio
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
from typing import Any
|
||||
from typing import Any, Callable
|
||||
|
||||
from rich.tree import Tree
|
||||
|
||||
@ -73,7 +73,6 @@ class BaseIOAction(BaseAction):
|
||||
inject_last_result=inject_last_result,
|
||||
)
|
||||
self.mode = mode
|
||||
self._requires_injection = True
|
||||
|
||||
def from_input(self, raw: str | bytes) -> Any:
|
||||
raise NotImplementedError
|
||||
@ -81,15 +80,15 @@ class BaseIOAction(BaseAction):
|
||||
def to_output(self, result: Any) -> str | bytes:
|
||||
raise NotImplementedError
|
||||
|
||||
async def _resolve_input(self, kwargs: dict[str, Any]) -> str | bytes:
|
||||
last_result = kwargs.pop(self.inject_into, None)
|
||||
|
||||
async def _resolve_input(
|
||||
self, args: tuple[Any], kwargs: dict[str, Any]
|
||||
) -> str | bytes:
|
||||
data = await self._read_stdin()
|
||||
if data:
|
||||
return self.from_input(data)
|
||||
|
||||
if last_result is not None:
|
||||
return last_result
|
||||
if len(args) == 1:
|
||||
return self.from_input(args[0])
|
||||
|
||||
if self.inject_last_result and self.shared_context:
|
||||
return self.shared_context.last_result()
|
||||
@ -99,6 +98,9 @@ class BaseIOAction(BaseAction):
|
||||
)
|
||||
raise FalyxError("No input provided and no last result to inject.")
|
||||
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
|
||||
return None, None
|
||||
|
||||
async def __call__(self, *args, **kwargs):
|
||||
context = ExecutionContext(
|
||||
name=self.name,
|
||||
@ -117,8 +119,8 @@ class BaseIOAction(BaseAction):
|
||||
pass
|
||||
result = getattr(self, "_last_result", None)
|
||||
else:
|
||||
parsed_input = await self._resolve_input(kwargs)
|
||||
result = await self._run(parsed_input, *args, **kwargs)
|
||||
parsed_input = await self._resolve_input(args, kwargs)
|
||||
result = await self._run(parsed_input)
|
||||
output = self.to_output(result)
|
||||
await self._write_stdout(output)
|
||||
context.result = result
|
||||
@ -195,7 +197,6 @@ class ShellAction(BaseIOAction):
|
||||
- Captures stdout and stderr from shell execution
|
||||
- Raises on non-zero exit codes with stderr as the error
|
||||
- Result is returned as trimmed stdout string
|
||||
- Compatible with ChainedAction and Command.requires_input detection
|
||||
|
||||
Args:
|
||||
name (str): Name of the action.
|
||||
@ -220,6 +221,11 @@ class ShellAction(BaseIOAction):
|
||||
)
|
||||
return raw.strip() if isinstance(raw, str) else raw.decode("utf-8").strip()
|
||||
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
|
||||
if sys.stdin.isatty():
|
||||
return self._run, {"parsed_input": {"help": self.command_template}}
|
||||
return None, None
|
||||
|
||||
async def _run(self, parsed_input: str) -> str:
|
||||
# Replace placeholder in template, or use raw input as full command
|
||||
command = self.command_template.format(parsed_input)
|
||||
|
@ -73,6 +73,9 @@ class MenuAction(BaseAction):
|
||||
table.add_row(*row)
|
||||
return table
|
||||
|
||||
def get_infer_target(self) -> tuple[None, None]:
|
||||
return None, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> Any:
|
||||
kwargs = self._maybe_inject_last_result(kwargs)
|
||||
context = ExecutionContext(
|
||||
|
134
falyx/action/prompt_menu_action.py
Normal file
134
falyx/action/prompt_menu_action.py
Normal file
@ -0,0 +1,134 @@
|
||||
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
|
||||
"""prompt_menu_action.py"""
|
||||
from typing import Any
|
||||
|
||||
from prompt_toolkit import PromptSession
|
||||
from prompt_toolkit.formatted_text import FormattedText, merge_formatted_text
|
||||
from rich.console import Console
|
||||
from rich.tree import Tree
|
||||
|
||||
from falyx.action.action import BaseAction
|
||||
from falyx.context import ExecutionContext
|
||||
from falyx.execution_registry import ExecutionRegistry as er
|
||||
from falyx.hook_manager import HookType
|
||||
from falyx.logger import logger
|
||||
from falyx.menu import MenuOptionMap
|
||||
from falyx.signals import BackSignal, QuitSignal
|
||||
from falyx.themes import OneColors
|
||||
|
||||
|
||||
class PromptMenuAction(BaseAction):
|
||||
"""PromptMenuAction class for creating prompt -> actions."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
menu_options: MenuOptionMap,
|
||||
*,
|
||||
prompt_message: str = "Select > ",
|
||||
default_selection: str = "",
|
||||
inject_last_result: bool = False,
|
||||
inject_into: str = "last_result",
|
||||
console: Console | None = None,
|
||||
prompt_session: PromptSession | None = None,
|
||||
never_prompt: bool = False,
|
||||
include_reserved: bool = True,
|
||||
):
|
||||
super().__init__(
|
||||
name,
|
||||
inject_last_result=inject_last_result,
|
||||
inject_into=inject_into,
|
||||
never_prompt=never_prompt,
|
||||
)
|
||||
self.menu_options = menu_options
|
||||
self.prompt_message = prompt_message
|
||||
self.default_selection = default_selection
|
||||
self.console = console or Console(color_system="auto")
|
||||
self.prompt_session = prompt_session or PromptSession()
|
||||
self.include_reserved = include_reserved
|
||||
|
||||
def get_infer_target(self) -> tuple[None, None]:
|
||||
return None, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> Any:
|
||||
kwargs = self._maybe_inject_last_result(kwargs)
|
||||
context = ExecutionContext(
|
||||
name=self.name,
|
||||
args=args,
|
||||
kwargs=kwargs,
|
||||
action=self,
|
||||
)
|
||||
|
||||
effective_default = self.default_selection
|
||||
maybe_result = str(self.last_result)
|
||||
if maybe_result in self.menu_options:
|
||||
effective_default = maybe_result
|
||||
elif self.inject_last_result:
|
||||
logger.warning(
|
||||
"[%s] Injected last result '%s' not found in menu options",
|
||||
self.name,
|
||||
maybe_result,
|
||||
)
|
||||
|
||||
if self.never_prompt and not effective_default:
|
||||
raise ValueError(
|
||||
f"[{self.name}] 'never_prompt' is True but no valid default_selection"
|
||||
" was provided."
|
||||
)
|
||||
|
||||
context.start_timer()
|
||||
try:
|
||||
await self.hooks.trigger(HookType.BEFORE, context)
|
||||
key = effective_default
|
||||
if not self.never_prompt:
|
||||
placeholder_formatted_text = []
|
||||
for index, (key, option) in enumerate(self.menu_options.items()):
|
||||
placeholder_formatted_text.append(option.render_prompt(key))
|
||||
if index < len(self.menu_options) - 1:
|
||||
placeholder_formatted_text.append(
|
||||
FormattedText([(OneColors.WHITE, " | ")])
|
||||
)
|
||||
placeholder = merge_formatted_text(placeholder_formatted_text)
|
||||
key = await self.prompt_session.prompt_async(
|
||||
message=self.prompt_message, placeholder=placeholder
|
||||
)
|
||||
option = self.menu_options[key]
|
||||
result = await option.action(*args, **kwargs)
|
||||
context.result = result
|
||||
await self.hooks.trigger(HookType.ON_SUCCESS, context)
|
||||
return result
|
||||
|
||||
except BackSignal:
|
||||
logger.debug("[%s][BackSignal] ← Returning to previous menu", self.name)
|
||||
return None
|
||||
except QuitSignal:
|
||||
logger.debug("[%s][QuitSignal] ← Exiting application", self.name)
|
||||
raise
|
||||
except Exception as error:
|
||||
context.exception = error
|
||||
await self.hooks.trigger(HookType.ON_ERROR, context)
|
||||
raise
|
||||
finally:
|
||||
context.stop_timer()
|
||||
await self.hooks.trigger(HookType.AFTER, context)
|
||||
await self.hooks.trigger(HookType.ON_TEARDOWN, context)
|
||||
er.record(context)
|
||||
|
||||
async def preview(self, parent: Tree | None = None):
|
||||
label = f"[{OneColors.LIGHT_YELLOW_b}]📋 PromptMenuAction[/] '{self.name}'"
|
||||
tree = parent.add(label) if parent else Tree(label)
|
||||
for key, option in self.menu_options.items():
|
||||
tree.add(
|
||||
f"[dim]{key}[/]: {option.description} → [italic]{option.action.name}[/]"
|
||||
)
|
||||
await option.action.preview(parent=tree)
|
||||
if not parent:
|
||||
self.console.print(tree)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return (
|
||||
f"PromptMenuAction(name={self.name!r}, options={list(self.menu_options.keys())!r}, "
|
||||
f"default_selection={self.default_selection!r}, "
|
||||
f"include_reserved={self.include_reserved}, "
|
||||
f"prompt={'off' if self.never_prompt else 'on'})"
|
||||
)
|
@ -25,6 +25,7 @@ from falyx.selection import (
|
||||
prompt_for_selection,
|
||||
render_selection_dict_table,
|
||||
)
|
||||
from falyx.signals import CancelSignal
|
||||
from falyx.themes import OneColors
|
||||
|
||||
|
||||
@ -121,6 +122,16 @@ class SelectFileAction(BaseAction):
|
||||
logger.warning("[ERROR] Failed to parse %s: %s", file.name, error)
|
||||
return options
|
||||
|
||||
def _find_cancel_key(self, options) -> str:
|
||||
"""Return first numeric value not already used in the selection dict."""
|
||||
for index in range(len(options)):
|
||||
if str(index) not in options:
|
||||
return str(index)
|
||||
return str(len(options))
|
||||
|
||||
def get_infer_target(self) -> tuple[None, None]:
|
||||
return None, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> Any:
|
||||
context = ExecutionContext(name=self.name, args=args, kwargs=kwargs, action=self)
|
||||
context.start_timer()
|
||||
@ -128,28 +139,38 @@ class SelectFileAction(BaseAction):
|
||||
await self.hooks.trigger(HookType.BEFORE, context)
|
||||
|
||||
files = [
|
||||
f
|
||||
for f in self.directory.iterdir()
|
||||
if f.is_file()
|
||||
and (self.suffix_filter is None or f.suffix == self.suffix_filter)
|
||||
file
|
||||
for file in self.directory.iterdir()
|
||||
if file.is_file()
|
||||
and (self.suffix_filter is None or file.suffix == self.suffix_filter)
|
||||
]
|
||||
if not files:
|
||||
raise FileNotFoundError("No files found in directory.")
|
||||
|
||||
options = self.get_options(files)
|
||||
|
||||
cancel_key = self._find_cancel_key(options)
|
||||
cancel_option = {
|
||||
cancel_key: SelectionOption(
|
||||
description="Cancel", value=CancelSignal(), style=OneColors.DARK_RED
|
||||
)
|
||||
}
|
||||
|
||||
table = render_selection_dict_table(
|
||||
title=self.title, selections=options, columns=self.columns
|
||||
title=self.title, selections=options | cancel_option, columns=self.columns
|
||||
)
|
||||
|
||||
key = await prompt_for_selection(
|
||||
options.keys(),
|
||||
(options | cancel_option).keys(),
|
||||
table,
|
||||
console=self.console,
|
||||
prompt_session=self.prompt_session,
|
||||
prompt_message=self.prompt_message,
|
||||
)
|
||||
|
||||
if key == cancel_key:
|
||||
raise CancelSignal("User canceled the selection.")
|
||||
|
||||
result = options[key].value
|
||||
context.result = result
|
||||
await self.hooks.trigger(HookType.ON_SUCCESS, context)
|
||||
@ -176,11 +197,11 @@ class SelectFileAction(BaseAction):
|
||||
try:
|
||||
files = list(self.directory.iterdir())
|
||||
if self.suffix_filter:
|
||||
files = [f for f in files if f.suffix == self.suffix_filter]
|
||||
files = [file for file in files if file.suffix == self.suffix_filter]
|
||||
sample = files[:10]
|
||||
file_list = tree.add("[dim]Files:[/]")
|
||||
for f in sample:
|
||||
file_list.add(f"[dim]{f.name}[/]")
|
||||
for file in sample:
|
||||
file_list.add(f"[dim]{file.name}[/]")
|
||||
if len(files) > 10:
|
||||
file_list.add(f"[dim]... ({len(files) - 10} more)[/]")
|
||||
except Exception as error:
|
||||
|
@ -1,5 +1,6 @@
|
||||
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
|
||||
"""selection_action.py"""
|
||||
from copy import copy
|
||||
from typing import Any
|
||||
|
||||
from prompt_toolkit import PromptSession
|
||||
@ -7,19 +8,21 @@ from rich.console import Console
|
||||
from rich.tree import Tree
|
||||
|
||||
from falyx.action.action import BaseAction
|
||||
from falyx.action.types import SelectionReturnType
|
||||
from falyx.context import ExecutionContext
|
||||
from falyx.execution_registry import ExecutionRegistry as er
|
||||
from falyx.hook_manager import HookType
|
||||
from falyx.logger import logger
|
||||
from falyx.selection import (
|
||||
SelectionOption,
|
||||
SelectionOptionMap,
|
||||
prompt_for_index,
|
||||
prompt_for_selection,
|
||||
render_selection_dict_table,
|
||||
render_selection_indexed_table,
|
||||
)
|
||||
from falyx.signals import CancelSignal
|
||||
from falyx.themes import OneColors
|
||||
from falyx.utils import CaseInsensitiveDict
|
||||
|
||||
|
||||
class SelectionAction(BaseAction):
|
||||
@ -34,7 +37,13 @@ class SelectionAction(BaseAction):
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
selections: list[str] | set[str] | tuple[str, ...] | dict[str, SelectionOption],
|
||||
selections: (
|
||||
list[str]
|
||||
| set[str]
|
||||
| tuple[str, ...]
|
||||
| dict[str, SelectionOption]
|
||||
| dict[str, Any]
|
||||
),
|
||||
*,
|
||||
title: str = "Select an option",
|
||||
columns: int = 5,
|
||||
@ -42,7 +51,7 @@ class SelectionAction(BaseAction):
|
||||
default_selection: str = "",
|
||||
inject_last_result: bool = False,
|
||||
inject_into: str = "last_result",
|
||||
return_key: bool = False,
|
||||
return_type: SelectionReturnType | str = "value",
|
||||
console: Console | None = None,
|
||||
prompt_session: PromptSession | None = None,
|
||||
never_prompt: bool = False,
|
||||
@ -55,8 +64,8 @@ class SelectionAction(BaseAction):
|
||||
never_prompt=never_prompt,
|
||||
)
|
||||
# Setter normalizes to correct type, mypy can't infer that
|
||||
self.selections: list[str] | CaseInsensitiveDict = selections # type: ignore[assignment]
|
||||
self.return_key = return_key
|
||||
self.selections: list[str] | SelectionOptionMap = selections # type: ignore[assignment]
|
||||
self.return_type: SelectionReturnType = self._coerce_return_type(return_type)
|
||||
self.title = title
|
||||
self.columns = columns
|
||||
self.console = console or Console(color_system="auto")
|
||||
@ -64,9 +73,17 @@ class SelectionAction(BaseAction):
|
||||
self.default_selection = default_selection
|
||||
self.prompt_message = prompt_message
|
||||
self.show_table = show_table
|
||||
self.cancel_key = self._find_cancel_key()
|
||||
|
||||
def _coerce_return_type(
|
||||
self, return_type: SelectionReturnType | str
|
||||
) -> SelectionReturnType:
|
||||
if isinstance(return_type, SelectionReturnType):
|
||||
return return_type
|
||||
return SelectionReturnType(return_type)
|
||||
|
||||
@property
|
||||
def selections(self) -> list[str] | CaseInsensitiveDict:
|
||||
def selections(self) -> list[str] | SelectionOptionMap:
|
||||
return self._selections
|
||||
|
||||
@selections.setter
|
||||
@ -74,17 +91,69 @@ class SelectionAction(BaseAction):
|
||||
self, value: list[str] | set[str] | tuple[str, ...] | dict[str, SelectionOption]
|
||||
):
|
||||
if isinstance(value, (list, tuple, set)):
|
||||
self._selections: list[str] | CaseInsensitiveDict = list(value)
|
||||
self._selections: list[str] | SelectionOptionMap = list(value)
|
||||
elif isinstance(value, dict):
|
||||
cid = CaseInsensitiveDict()
|
||||
cid.update(value)
|
||||
self._selections = cid
|
||||
som = SelectionOptionMap()
|
||||
if all(isinstance(key, str) for key in value) and all(
|
||||
not isinstance(value[key], SelectionOption) for key in value
|
||||
):
|
||||
som.update(
|
||||
{
|
||||
str(index): SelectionOption(key, option)
|
||||
for index, (key, option) in enumerate(value.items())
|
||||
}
|
||||
)
|
||||
elif all(isinstance(key, str) for key in value) and all(
|
||||
isinstance(value[key], SelectionOption) for key in value
|
||||
):
|
||||
som.update(value)
|
||||
else:
|
||||
raise ValueError("Invalid dictionary format. Keys must be strings")
|
||||
self._selections = som
|
||||
else:
|
||||
raise TypeError(
|
||||
"'selections' must be a list[str] or dict[str, SelectionOption], "
|
||||
f"got {type(value).__name__}"
|
||||
)
|
||||
|
||||
def _find_cancel_key(self) -> str:
|
||||
"""Find the cancel key in the selections."""
|
||||
if isinstance(self.selections, dict):
|
||||
for index in range(len(self.selections) + 1):
|
||||
if str(index) not in self.selections:
|
||||
return str(index)
|
||||
return str(len(self.selections))
|
||||
|
||||
@property
|
||||
def cancel_key(self) -> str:
|
||||
return self._cancel_key
|
||||
|
||||
@cancel_key.setter
|
||||
def cancel_key(self, value: str) -> None:
|
||||
"""Set the cancel key for the selection."""
|
||||
if not isinstance(value, str):
|
||||
raise TypeError("Cancel key must be a string.")
|
||||
if isinstance(self.selections, dict) and value in self.selections:
|
||||
raise ValueError(
|
||||
"Cancel key cannot be one of the selection keys. "
|
||||
f"Current selections: {self.selections}"
|
||||
)
|
||||
if isinstance(self.selections, list):
|
||||
if not value.isdigit() or int(value) > len(self.selections):
|
||||
raise ValueError(
|
||||
"cancel_key must be a digit and not greater than the number of selections."
|
||||
)
|
||||
self._cancel_key = value
|
||||
|
||||
def cancel_formatter(self, index: int, selection: str) -> str:
|
||||
"""Format the cancel option for display."""
|
||||
if self.cancel_key == str(index):
|
||||
return f"[{index}] [{OneColors.DARK_RED}]Cancel[/]"
|
||||
return f"[{index}] {selection}"
|
||||
|
||||
def get_infer_target(self) -> tuple[None, None]:
|
||||
return None, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> Any:
|
||||
kwargs = self._maybe_inject_last_result(kwargs)
|
||||
context = ExecutionContext(
|
||||
@ -125,16 +194,18 @@ class SelectionAction(BaseAction):
|
||||
|
||||
context.start_timer()
|
||||
try:
|
||||
self.cancel_key = self._find_cancel_key()
|
||||
await self.hooks.trigger(HookType.BEFORE, context)
|
||||
if isinstance(self.selections, list):
|
||||
table = render_selection_indexed_table(
|
||||
title=self.title,
|
||||
selections=self.selections,
|
||||
selections=self.selections + ["Cancel"],
|
||||
columns=self.columns,
|
||||
formatter=self.cancel_formatter,
|
||||
)
|
||||
if not self.never_prompt:
|
||||
index = await prompt_for_index(
|
||||
len(self.selections) - 1,
|
||||
index: int | str = await prompt_for_index(
|
||||
len(self.selections),
|
||||
table,
|
||||
default_selection=effective_default,
|
||||
console=self.console,
|
||||
@ -144,14 +215,23 @@ class SelectionAction(BaseAction):
|
||||
)
|
||||
else:
|
||||
index = effective_default
|
||||
result = self.selections[int(index)]
|
||||
if int(index) == int(self.cancel_key):
|
||||
raise CancelSignal("User cancelled the selection.")
|
||||
result: Any = self.selections[int(index)]
|
||||
elif isinstance(self.selections, dict):
|
||||
cancel_option = {
|
||||
self.cancel_key: SelectionOption(
|
||||
description="Cancel", value=CancelSignal, style=OneColors.DARK_RED
|
||||
)
|
||||
}
|
||||
table = render_selection_dict_table(
|
||||
title=self.title, selections=self.selections, columns=self.columns
|
||||
title=self.title,
|
||||
selections=self.selections | cancel_option,
|
||||
columns=self.columns,
|
||||
)
|
||||
if not self.never_prompt:
|
||||
key = await prompt_for_selection(
|
||||
self.selections.keys(),
|
||||
(self.selections | cancel_option).keys(),
|
||||
table,
|
||||
default_selection=effective_default,
|
||||
console=self.console,
|
||||
@ -161,10 +241,25 @@ class SelectionAction(BaseAction):
|
||||
)
|
||||
else:
|
||||
key = effective_default
|
||||
result = key if self.return_key else self.selections[key].value
|
||||
if key == self.cancel_key:
|
||||
raise CancelSignal("User cancelled the selection.")
|
||||
if self.return_type == SelectionReturnType.KEY:
|
||||
result = key
|
||||
elif self.return_type == SelectionReturnType.VALUE:
|
||||
result = self.selections[key].value
|
||||
elif self.return_type == SelectionReturnType.ITEMS:
|
||||
result = {key: self.selections[key]}
|
||||
elif self.return_type == SelectionReturnType.DESCRIPTION:
|
||||
result = self.selections[key].description
|
||||
elif self.return_type == SelectionReturnType.DESCRIPTION_VALUE:
|
||||
result = {
|
||||
self.selections[key].description: self.selections[key].value
|
||||
}
|
||||
else:
|
||||
raise ValueError(f"Unsupported return type: {self.return_type}")
|
||||
else:
|
||||
raise TypeError(
|
||||
"'selections' must be a list[str] or dict[str, tuple[str, Any]], "
|
||||
"'selections' must be a list[str] or dict[str, Any], "
|
||||
f"got {type(self.selections).__name__}"
|
||||
)
|
||||
context.result = result
|
||||
@ -203,7 +298,7 @@ class SelectionAction(BaseAction):
|
||||
return
|
||||
|
||||
tree.add(f"[dim]Default:[/] '{self.default_selection or self.last_result}'")
|
||||
tree.add(f"[dim]Return:[/] {'Key' if self.return_key else 'Value'}")
|
||||
tree.add(f"[dim]Return:[/] {self.return_type.name.capitalize()}")
|
||||
tree.add(f"[dim]Prompt:[/] {'Disabled' if self.never_prompt else 'Enabled'}")
|
||||
|
||||
if not parent:
|
||||
@ -218,6 +313,6 @@ class SelectionAction(BaseAction):
|
||||
return (
|
||||
f"SelectionAction(name={self.name!r}, type={selection_type}, "
|
||||
f"default_selection={self.default_selection!r}, "
|
||||
f"return_key={self.return_key}, "
|
||||
f"return_type={self.return_type!r}, "
|
||||
f"prompt={'off' if self.never_prompt else 'on'})"
|
||||
)
|
||||
|
@ -35,3 +35,18 @@ class FileReturnType(Enum):
|
||||
return member
|
||||
valid = ", ".join(member.value for member in cls)
|
||||
raise ValueError(f"Invalid FileReturnType: '{value}'. Must be one of: {valid}")
|
||||
|
||||
|
||||
class SelectionReturnType(Enum):
|
||||
"""Enum for dictionary return types."""
|
||||
|
||||
KEY = "key"
|
||||
VALUE = "value"
|
||||
DESCRIPTION = "description"
|
||||
DESCRIPTION_VALUE = "description_value"
|
||||
ITEMS = "items"
|
||||
|
||||
@classmethod
|
||||
def _missing_(cls, value: object) -> SelectionReturnType:
|
||||
valid = ", ".join(member.value for member in cls)
|
||||
raise ValueError(f"Invalid DictReturnType: '{value}'. Must be one of: {valid}")
|
||||
|
@ -43,6 +43,9 @@ class UserInputAction(BaseAction):
|
||||
self.console = console or Console(color_system="auto")
|
||||
self.prompt_session = prompt_session or PromptSession()
|
||||
|
||||
def get_infer_target(self) -> tuple[None, None]:
|
||||
return None, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> str:
|
||||
context = ExecutionContext(
|
||||
name=self.name,
|
||||
|
@ -19,7 +19,6 @@ in building robust interactive menus.
|
||||
from __future__ import annotations
|
||||
|
||||
import shlex
|
||||
from functools import cached_property
|
||||
from typing import Any, Callable
|
||||
|
||||
from prompt_toolkit.formatted_text import FormattedText
|
||||
@ -27,25 +26,15 @@ from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator
|
||||
from rich.console import Console
|
||||
from rich.tree import Tree
|
||||
|
||||
from falyx.action.action import (
|
||||
Action,
|
||||
ActionGroup,
|
||||
BaseAction,
|
||||
ChainedAction,
|
||||
ProcessAction,
|
||||
)
|
||||
from falyx.action.io_action import BaseIOAction
|
||||
from falyx.action.action import Action, BaseAction
|
||||
from falyx.context import ExecutionContext
|
||||
from falyx.debug import register_debug_hooks
|
||||
from falyx.execution_registry import ExecutionRegistry as er
|
||||
from falyx.hook_manager import HookManager, HookType
|
||||
from falyx.logger import logger
|
||||
from falyx.options_manager import OptionsManager
|
||||
from falyx.parsers import (
|
||||
CommandArgumentParser,
|
||||
infer_args_from_func,
|
||||
same_argument_definitions,
|
||||
)
|
||||
from falyx.parsers.argparse import CommandArgumentParser
|
||||
from falyx.parsers.signature import infer_args_from_func
|
||||
from falyx.prompt_utils import confirm_async, should_prompt_user
|
||||
from falyx.protocols import ArgParserProtocol
|
||||
from falyx.retry import RetryPolicy
|
||||
@ -99,7 +88,6 @@ class Command(BaseModel):
|
||||
retry_policy (RetryPolicy): Retry behavior configuration.
|
||||
tags (list[str]): Organizational tags for the command.
|
||||
logging_hooks (bool): Whether to attach logging hooks automatically.
|
||||
requires_input (bool | None): Indicates if the action needs input.
|
||||
options_manager (OptionsManager): Manages global command-line options.
|
||||
arg_parser (CommandArgumentParser): Parses command arguments.
|
||||
custom_parser (ArgParserProtocol | None): Custom argument parser.
|
||||
@ -116,7 +104,7 @@ class Command(BaseModel):
|
||||
|
||||
key: str
|
||||
description: str
|
||||
action: BaseAction | Callable[[Any], Any]
|
||||
action: BaseAction | Callable[..., Any]
|
||||
args: tuple = ()
|
||||
kwargs: dict[str, Any] = Field(default_factory=dict)
|
||||
hidden: bool = False
|
||||
@ -138,24 +126,23 @@ class Command(BaseModel):
|
||||
retry_policy: RetryPolicy = Field(default_factory=RetryPolicy)
|
||||
tags: list[str] = Field(default_factory=list)
|
||||
logging_hooks: bool = False
|
||||
requires_input: bool | None = None
|
||||
options_manager: OptionsManager = Field(default_factory=OptionsManager)
|
||||
arg_parser: CommandArgumentParser = Field(default_factory=CommandArgumentParser)
|
||||
arguments: list[dict[str, Any]] = Field(default_factory=list)
|
||||
argument_config: Callable[[CommandArgumentParser], None] | None = None
|
||||
custom_parser: ArgParserProtocol | None = None
|
||||
custom_help: Callable[[], str | None] | None = None
|
||||
auto_args: bool = False
|
||||
auto_args: bool = True
|
||||
arg_metadata: dict[str, str | dict[str, Any]] = Field(default_factory=dict)
|
||||
|
||||
_context: ExecutionContext | None = PrivateAttr(default=None)
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
|
||||
def parse_args(
|
||||
async def parse_args(
|
||||
self, raw_args: list[str] | str, from_validate: bool = False
|
||||
) -> tuple[tuple, dict]:
|
||||
if self.custom_parser:
|
||||
if callable(self.custom_parser):
|
||||
if isinstance(raw_args, str):
|
||||
try:
|
||||
raw_args = shlex.split(raw_args)
|
||||
@ -178,7 +165,9 @@ class Command(BaseModel):
|
||||
raw_args,
|
||||
)
|
||||
return ((), {})
|
||||
return self.arg_parser.parse_args_split(raw_args, from_validate=from_validate)
|
||||
return await self.arg_parser.parse_args_split(
|
||||
raw_args, from_validate=from_validate
|
||||
)
|
||||
|
||||
@field_validator("action", mode="before")
|
||||
@classmethod
|
||||
@ -192,28 +181,15 @@ class Command(BaseModel):
|
||||
def get_argument_definitions(self) -> list[dict[str, Any]]:
|
||||
if self.arguments:
|
||||
return self.arguments
|
||||
elif self.argument_config:
|
||||
elif callable(self.argument_config):
|
||||
self.argument_config(self.arg_parser)
|
||||
elif self.auto_args:
|
||||
if isinstance(self.action, (Action, ProcessAction)):
|
||||
return infer_args_from_func(self.action.action, self.arg_metadata)
|
||||
elif isinstance(self.action, ChainedAction):
|
||||
if self.action.actions:
|
||||
action = self.action.actions[0]
|
||||
if isinstance(action, Action):
|
||||
return infer_args_from_func(action.action, self.arg_metadata)
|
||||
elif callable(action):
|
||||
return infer_args_from_func(action, self.arg_metadata)
|
||||
elif isinstance(self.action, ActionGroup):
|
||||
arg_defs = same_argument_definitions(
|
||||
self.action.actions, self.arg_metadata
|
||||
)
|
||||
if arg_defs:
|
||||
return arg_defs
|
||||
logger.debug(
|
||||
"[Command:%s] auto_args disabled: mismatched ActionGroup arguments",
|
||||
self.key,
|
||||
)
|
||||
if isinstance(self.action, BaseAction):
|
||||
infer_target, maybe_metadata = self.action.get_infer_target()
|
||||
# merge metadata with the action's metadata if not already in self.arg_metadata
|
||||
if maybe_metadata:
|
||||
self.arg_metadata = {**maybe_metadata, **self.arg_metadata}
|
||||
return infer_args_from_func(infer_target, self.arg_metadata)
|
||||
elif callable(self.action):
|
||||
return infer_args_from_func(self.action, self.arg_metadata)
|
||||
return []
|
||||
@ -241,30 +217,9 @@ class Command(BaseModel):
|
||||
if self.logging_hooks and isinstance(self.action, BaseAction):
|
||||
register_debug_hooks(self.action.hooks)
|
||||
|
||||
if self.requires_input is None and self.detect_requires_input:
|
||||
self.requires_input = True
|
||||
self.hidden = True
|
||||
elif self.requires_input is None:
|
||||
self.requires_input = False
|
||||
|
||||
for arg_def in self.get_argument_definitions():
|
||||
self.arg_parser.add_argument(*arg_def.pop("flags"), **arg_def)
|
||||
|
||||
@cached_property
|
||||
def detect_requires_input(self) -> bool:
|
||||
"""Detect if the action requires input based on its type."""
|
||||
if isinstance(self.action, BaseIOAction):
|
||||
return True
|
||||
elif isinstance(self.action, ChainedAction):
|
||||
return (
|
||||
isinstance(self.action.actions[0], BaseIOAction)
|
||||
if self.action.actions
|
||||
else False
|
||||
)
|
||||
elif isinstance(self.action, ActionGroup):
|
||||
return any(isinstance(action, BaseIOAction) for action in self.action.actions)
|
||||
return False
|
||||
|
||||
def _inject_options_manager(self) -> None:
|
||||
"""Inject the options manager into the action if applicable."""
|
||||
if isinstance(self.action, BaseAction):
|
||||
@ -357,7 +312,7 @@ class Command(BaseModel):
|
||||
|
||||
def show_help(self) -> bool:
|
||||
"""Display the help message for the command."""
|
||||
if self.custom_help:
|
||||
if callable(self.custom_help):
|
||||
output = self.custom_help()
|
||||
if output:
|
||||
console.print(output)
|
||||
|
@ -98,7 +98,6 @@ class RawCommand(BaseModel):
|
||||
retry: bool = False
|
||||
retry_all: bool = False
|
||||
retry_policy: RetryPolicy = Field(default_factory=RetryPolicy)
|
||||
requires_input: bool | None = None
|
||||
hidden: bool = False
|
||||
help_text: str = ""
|
||||
|
||||
|
106
falyx/falyx.py
106
falyx/falyx.py
@ -61,9 +61,9 @@ from falyx.options_manager import OptionsManager
|
||||
from falyx.parsers import CommandArgumentParser, get_arg_parsers
|
||||
from falyx.protocols import ArgParserProtocol
|
||||
from falyx.retry import RetryPolicy
|
||||
from falyx.signals import BackSignal, CancelSignal, FlowSignal, HelpSignal, QuitSignal
|
||||
from falyx.signals import BackSignal, CancelSignal, HelpSignal, QuitSignal
|
||||
from falyx.themes import OneColors, get_nord_theme
|
||||
from falyx.utils import CaseInsensitiveDict, _noop, chunks, get_program_invocation
|
||||
from falyx.utils import CaseInsensitiveDict, _noop, chunks
|
||||
from falyx.version import __version__
|
||||
|
||||
|
||||
@ -83,14 +83,17 @@ class CommandValidator(Validator):
|
||||
self.error_message = error_message
|
||||
|
||||
def validate(self, document) -> None:
|
||||
pass
|
||||
|
||||
async def validate_async(self, document) -> None:
|
||||
text = document.text
|
||||
is_preview, choice, _, __ = self.falyx.get_command(text, from_validate=True)
|
||||
is_preview, choice, _, __ = await self.falyx.get_command(text, from_validate=True)
|
||||
if is_preview:
|
||||
return None
|
||||
if not choice:
|
||||
raise ValidationError(
|
||||
message=self.error_message,
|
||||
cursor_position=document.get_end_of_document_position(),
|
||||
cursor_position=len(text),
|
||||
)
|
||||
|
||||
|
||||
@ -111,6 +114,8 @@ class Falyx:
|
||||
- Submenu nesting and action chaining
|
||||
- History tracking, help generation, and run key execution modes
|
||||
- Seamless CLI argument parsing and integration via argparse
|
||||
- Declarative option management with OptionsManager
|
||||
- Command level argument parsing and validation
|
||||
- Extensible with user-defined hooks, bottom bars, and custom layouts
|
||||
|
||||
Args:
|
||||
@ -126,7 +131,7 @@ class Falyx:
|
||||
never_prompt (bool): Seed default for `OptionsManager["never_prompt"]`
|
||||
force_confirm (bool): Seed default for `OptionsManager["force_confirm"]`
|
||||
cli_args (Namespace | None): Parsed CLI arguments, usually from argparse.
|
||||
options (OptionsManager | None): Declarative option mappings.
|
||||
options (OptionsManager | None): Declarative option mappings for global state.
|
||||
custom_table (Callable[[Falyx], Table] | Table | None): Custom menu table
|
||||
generator.
|
||||
|
||||
@ -158,8 +163,9 @@ class Falyx:
|
||||
force_confirm: bool = False,
|
||||
cli_args: Namespace | None = None,
|
||||
options: OptionsManager | None = None,
|
||||
render_menu: Callable[["Falyx"], None] | None = None,
|
||||
custom_table: Callable[["Falyx"], Table] | Table | None = None,
|
||||
render_menu: Callable[[Falyx], None] | None = None,
|
||||
custom_table: Callable[[Falyx], Table] | Table | None = None,
|
||||
hide_menu_table: bool = False,
|
||||
) -> None:
|
||||
"""Initializes the Falyx object."""
|
||||
self.title: str | Markdown = title
|
||||
@ -183,8 +189,9 @@ class Falyx:
|
||||
self._never_prompt: bool = never_prompt
|
||||
self._force_confirm: bool = force_confirm
|
||||
self.cli_args: Namespace | None = cli_args
|
||||
self.render_menu: Callable[["Falyx"], None] | None = render_menu
|
||||
self.custom_table: Callable[["Falyx"], Table] | Table | None = custom_table
|
||||
self.render_menu: Callable[[Falyx], None] | None = render_menu
|
||||
self.custom_table: Callable[[Falyx], Table] | Table | None = custom_table
|
||||
self._hide_menu_table: bool = hide_menu_table
|
||||
self.validate_options(cli_args, options)
|
||||
self._prompt_session: PromptSession | None = None
|
||||
self.mode = FalyxMode.MENU
|
||||
@ -287,8 +294,6 @@ class Falyx:
|
||||
|
||||
for command in self.commands.values():
|
||||
help_text = command.help_text or command.description
|
||||
if command.requires_input:
|
||||
help_text += " [dim](requires input)[/dim]"
|
||||
table.add_row(
|
||||
f"[{command.style}]{command.key}[/]",
|
||||
", ".join(command.aliases) if command.aliases else "",
|
||||
@ -445,7 +450,6 @@ class Falyx:
|
||||
bottom_toolbar=self._get_bottom_bar_render(),
|
||||
key_bindings=self.key_bindings,
|
||||
validate_while_typing=False,
|
||||
interrupt_exception=FlowSignal,
|
||||
)
|
||||
return self._prompt_session
|
||||
|
||||
@ -526,7 +530,7 @@ class Falyx:
|
||||
key: str = "X",
|
||||
description: str = "Exit",
|
||||
aliases: list[str] | None = None,
|
||||
action: Callable[[Any], Any] | None = None,
|
||||
action: Callable[..., Any] | None = None,
|
||||
style: str = OneColors.DARK_RED,
|
||||
confirm: bool = False,
|
||||
confirm_message: str = "Are you sure?",
|
||||
@ -580,7 +584,7 @@ class Falyx:
|
||||
self,
|
||||
key: str,
|
||||
description: str,
|
||||
action: BaseAction | Callable[[Any], Any],
|
||||
action: BaseAction | Callable[..., Any],
|
||||
*,
|
||||
args: tuple = (),
|
||||
kwargs: dict[str, Any] | None = None,
|
||||
@ -608,13 +612,12 @@ class Falyx:
|
||||
retry: bool = False,
|
||||
retry_all: bool = False,
|
||||
retry_policy: RetryPolicy | None = None,
|
||||
requires_input: bool | None = None,
|
||||
arg_parser: CommandArgumentParser | None = None,
|
||||
arguments: list[dict[str, Any]] | None = None,
|
||||
argument_config: Callable[[CommandArgumentParser], None] | None = None,
|
||||
custom_parser: ArgParserProtocol | None = None,
|
||||
custom_help: Callable[[], str | None] | None = None,
|
||||
auto_args: bool = False,
|
||||
auto_args: bool = True,
|
||||
arg_metadata: dict[str, str | dict[str, Any]] | None = None,
|
||||
) -> Command:
|
||||
"""Adds an command to the menu, preventing duplicates."""
|
||||
@ -660,7 +663,6 @@ class Falyx:
|
||||
retry=retry,
|
||||
retry_all=retry_all,
|
||||
retry_policy=retry_policy or RetryPolicy(),
|
||||
requires_input=requires_input,
|
||||
options_manager=self.options,
|
||||
arg_parser=arg_parser,
|
||||
arguments=arguments or [],
|
||||
@ -741,7 +743,7 @@ class Falyx:
|
||||
return True, input_str[1:].strip()
|
||||
return False, input_str.strip()
|
||||
|
||||
def get_command(
|
||||
async def get_command(
|
||||
self, raw_choices: str, from_validate=False
|
||||
) -> tuple[bool, Command | None, tuple, dict[str, Any]]:
|
||||
"""
|
||||
@ -768,26 +770,29 @@ class Falyx:
|
||||
|
||||
choice = choice.upper()
|
||||
name_map = self._name_map
|
||||
if choice in name_map:
|
||||
if name_map.get(choice):
|
||||
if not from_validate:
|
||||
logger.info("Command '%s' selected.", choice)
|
||||
if input_args and name_map[choice].arg_parser:
|
||||
try:
|
||||
args, kwargs = name_map[choice].parse_args(input_args, from_validate)
|
||||
except CommandArgumentError as error:
|
||||
if not from_validate:
|
||||
if not name_map[choice].show_help():
|
||||
self.console.print(
|
||||
f"[{OneColors.DARK_RED}]❌ Invalid arguments for '{choice}': {error}"
|
||||
)
|
||||
else:
|
||||
name_map[choice].show_help()
|
||||
raise ValidationError(
|
||||
message=str(error), cursor_position=len(raw_choices)
|
||||
if is_preview:
|
||||
return True, name_map[choice], args, kwargs
|
||||
try:
|
||||
args, kwargs = await name_map[choice].parse_args(
|
||||
input_args, from_validate
|
||||
)
|
||||
except CommandArgumentError as error:
|
||||
if not from_validate:
|
||||
if not name_map[choice].show_help():
|
||||
self.console.print(
|
||||
f"[{OneColors.DARK_RED}]❌ Invalid arguments for '{choice}': {error}"
|
||||
)
|
||||
return is_preview, None, args, kwargs
|
||||
except HelpSignal:
|
||||
return True, None, args, kwargs
|
||||
else:
|
||||
name_map[choice].show_help()
|
||||
raise ValidationError(
|
||||
message=str(error), cursor_position=len(raw_choices)
|
||||
)
|
||||
return is_preview, None, args, kwargs
|
||||
except HelpSignal:
|
||||
return True, None, args, kwargs
|
||||
return is_preview, name_map[choice], args, kwargs
|
||||
|
||||
prefix_matches = [cmd for key, cmd in name_map.items() if key.startswith(choice)]
|
||||
@ -834,7 +839,7 @@ class Falyx:
|
||||
"""Processes the action of the selected command."""
|
||||
with patch_stdout(raw=True):
|
||||
choice = await self.prompt_session.prompt_async()
|
||||
is_preview, selected_command, args, kwargs = self.get_command(choice)
|
||||
is_preview, selected_command, args, kwargs = await self.get_command(choice)
|
||||
if not selected_command:
|
||||
logger.info("Invalid command '%s'.", choice)
|
||||
return True
|
||||
@ -844,15 +849,6 @@ class Falyx:
|
||||
await selected_command.preview()
|
||||
return True
|
||||
|
||||
if selected_command.requires_input:
|
||||
program = get_program_invocation()
|
||||
self.console.print(
|
||||
f"[{OneColors.LIGHT_YELLOW}]⚠️ Command '{selected_command.key}' requires"
|
||||
f" input and must be run via [{OneColors.MAGENTA}]'{program} run"
|
||||
f"'[{OneColors.LIGHT_YELLOW}] with proper piping or arguments.[/]"
|
||||
)
|
||||
return True
|
||||
|
||||
self.last_run_command = selected_command
|
||||
|
||||
if selected_command == self.exit_command:
|
||||
@ -885,7 +881,7 @@ class Falyx:
|
||||
) -> Any:
|
||||
"""Run a command by key without displaying the menu (non-interactive mode)."""
|
||||
self.debug_hooks()
|
||||
is_preview, selected_command, _, __ = self.get_command(command_key)
|
||||
is_preview, selected_command, _, __ = await self.get_command(command_key)
|
||||
kwargs = kwargs or {}
|
||||
|
||||
self.last_run_command = selected_command
|
||||
@ -984,10 +980,11 @@ class Falyx:
|
||||
self.print_message(self.welcome_message)
|
||||
try:
|
||||
while True:
|
||||
if callable(self.render_menu):
|
||||
self.render_menu(self)
|
||||
else:
|
||||
self.console.print(self.table, justify="center")
|
||||
if not self.options.get("hide_menu_table", self._hide_menu_table):
|
||||
if callable(self.render_menu):
|
||||
self.render_menu(self)
|
||||
else:
|
||||
self.console.print(self.table, justify="center")
|
||||
try:
|
||||
task = asyncio.create_task(self.process_command())
|
||||
should_continue = await task
|
||||
@ -1020,6 +1017,9 @@ class Falyx:
|
||||
if not self.options.get("force_confirm"):
|
||||
self.options.set("force_confirm", self._force_confirm)
|
||||
|
||||
if not self.options.get("hide_menu_table"):
|
||||
self.options.set("hide_menu_table", self._hide_menu_table)
|
||||
|
||||
if self.cli_args.verbose:
|
||||
logging.getLogger("falyx").setLevel(logging.DEBUG)
|
||||
|
||||
@ -1037,7 +1037,7 @@ class Falyx:
|
||||
|
||||
if self.cli_args.command == "preview":
|
||||
self.mode = FalyxMode.PREVIEW
|
||||
_, command, args, kwargs = self.get_command(self.cli_args.name)
|
||||
_, command, args, kwargs = await self.get_command(self.cli_args.name)
|
||||
if not command:
|
||||
self.console.print(
|
||||
f"[{OneColors.DARK_RED}]❌ Command '{self.cli_args.name}' not found."
|
||||
@ -1051,7 +1051,7 @@ class Falyx:
|
||||
|
||||
if self.cli_args.command == "run":
|
||||
self.mode = FalyxMode.RUN
|
||||
is_preview, command, _, __ = self.get_command(self.cli_args.name)
|
||||
is_preview, command, _, __ = await self.get_command(self.cli_args.name)
|
||||
if is_preview:
|
||||
if command is None:
|
||||
sys.exit(1)
|
||||
@ -1062,7 +1062,7 @@ class Falyx:
|
||||
sys.exit(1)
|
||||
self._set_retry_policy(command)
|
||||
try:
|
||||
args, kwargs = command.parse_args(self.cli_args.command_args)
|
||||
args, kwargs = await command.parse_args(self.cli_args.command_args)
|
||||
except HelpSignal:
|
||||
sys.exit(0)
|
||||
try:
|
||||
|
@ -4,7 +4,7 @@ from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
from enum import Enum
|
||||
from typing import Awaitable, Callable, Dict, List, Optional, Union
|
||||
from typing import Awaitable, Callable, Union
|
||||
|
||||
from falyx.context import ExecutionContext
|
||||
from falyx.logger import logger
|
||||
@ -24,7 +24,7 @@ class HookType(Enum):
|
||||
ON_TEARDOWN = "on_teardown"
|
||||
|
||||
@classmethod
|
||||
def choices(cls) -> List[HookType]:
|
||||
def choices(cls) -> list[HookType]:
|
||||
"""Return a list of all hook type choices."""
|
||||
return list(cls)
|
||||
|
||||
@ -37,16 +37,17 @@ class HookManager:
|
||||
"""HookManager"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._hooks: Dict[HookType, List[Hook]] = {
|
||||
self._hooks: dict[HookType, list[Hook]] = {
|
||||
hook_type: [] for hook_type in HookType
|
||||
}
|
||||
|
||||
def register(self, hook_type: HookType, hook: Hook):
|
||||
if hook_type not in HookType:
|
||||
raise ValueError(f"Unsupported hook type: {hook_type}")
|
||||
def register(self, hook_type: HookType | str, hook: Hook):
|
||||
"""Raises ValueError if the hook type is not supported."""
|
||||
if not isinstance(hook_type, HookType):
|
||||
hook_type = HookType(hook_type)
|
||||
self._hooks[hook_type].append(hook)
|
||||
|
||||
def clear(self, hook_type: Optional[HookType] = None):
|
||||
def clear(self, hook_type: HookType | None = None):
|
||||
if hook_type:
|
||||
self._hooks[hook_type] = []
|
||||
else:
|
||||
|
@ -2,6 +2,8 @@ from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from prompt_toolkit.formatted_text import FormattedText
|
||||
|
||||
from falyx.action import BaseAction
|
||||
from falyx.signals import BackSignal, QuitSignal
|
||||
from falyx.themes import OneColors
|
||||
@ -26,6 +28,12 @@ class MenuOption:
|
||||
"""Render the menu option for display."""
|
||||
return f"[{OneColors.WHITE}][{key}][/] [{self.style}]{self.description}[/]"
|
||||
|
||||
def render_prompt(self, key: str) -> FormattedText:
|
||||
"""Render the menu option for prompt display."""
|
||||
return FormattedText(
|
||||
[(OneColors.WHITE, f"[{key}] "), (self.style, self.description)]
|
||||
)
|
||||
|
||||
|
||||
class MenuOptionMap(CaseInsensitiveDict):
|
||||
"""
|
||||
@ -33,7 +41,7 @@ class MenuOptionMap(CaseInsensitiveDict):
|
||||
and special signal entries like Quit and Back.
|
||||
"""
|
||||
|
||||
RESERVED_KEYS = {"Q", "B"}
|
||||
RESERVED_KEYS = {"B", "X"}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@ -49,14 +57,14 @@ class MenuOptionMap(CaseInsensitiveDict):
|
||||
def _inject_reserved_defaults(self):
|
||||
from falyx.action import SignalAction
|
||||
|
||||
self._add_reserved(
|
||||
"Q",
|
||||
MenuOption("Exit", SignalAction("Quit", QuitSignal()), OneColors.DARK_RED),
|
||||
)
|
||||
self._add_reserved(
|
||||
"B",
|
||||
MenuOption("Back", SignalAction("Back", BackSignal()), OneColors.DARK_YELLOW),
|
||||
)
|
||||
self._add_reserved(
|
||||
"X",
|
||||
MenuOption("Exit", SignalAction("Quit", QuitSignal()), OneColors.DARK_RED),
|
||||
)
|
||||
|
||||
def _add_reserved(self, key: str, option: MenuOption) -> None:
|
||||
"""Add a reserved key, bypassing validation."""
|
||||
@ -78,8 +86,20 @@ class MenuOptionMap(CaseInsensitiveDict):
|
||||
raise ValueError(f"Cannot delete reserved option '{key}'.")
|
||||
super().__delitem__(key)
|
||||
|
||||
def update(self, other=None, **kwargs):
|
||||
"""Update the selection options with another dictionary."""
|
||||
if other:
|
||||
for key, option in other.items():
|
||||
if not isinstance(option, MenuOption):
|
||||
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
|
||||
self[key] = option
|
||||
for key, option in kwargs.items():
|
||||
if not isinstance(option, MenuOption):
|
||||
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
|
||||
self[key] = option
|
||||
|
||||
def items(self, include_reserved: bool = True):
|
||||
for k, v in super().items():
|
||||
if not include_reserved and k in self.RESERVED_KEYS:
|
||||
for key, option in super().items():
|
||||
if not include_reserved and key in self.RESERVED_KEYS:
|
||||
continue
|
||||
yield k, v
|
||||
yield key, option
|
||||
|
@ -7,8 +7,6 @@ Licensed under the MIT License. See LICENSE file for details.
|
||||
|
||||
from .argparse import Argument, ArgumentAction, CommandArgumentParser
|
||||
from .parsers import FalyxParsers, get_arg_parsers
|
||||
from .signature import infer_args_from_func
|
||||
from .utils import same_argument_definitions
|
||||
|
||||
__all__ = [
|
||||
"Argument",
|
||||
@ -16,6 +14,4 @@ __all__ = [
|
||||
"CommandArgumentParser",
|
||||
"get_arg_parsers",
|
||||
"FalyxParsers",
|
||||
"infer_args_from_func",
|
||||
"same_argument_definitions",
|
||||
]
|
||||
|
@ -1,4 +1,6 @@
|
||||
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
|
||||
from __future__ import annotations
|
||||
|
||||
from copy import deepcopy
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
@ -23,12 +25,21 @@ class ArgumentAction(Enum):
|
||||
COUNT = "count"
|
||||
HELP = "help"
|
||||
|
||||
@classmethod
|
||||
def choices(cls) -> list[ArgumentAction]:
|
||||
"""Return a list of all argument actions."""
|
||||
return list(cls)
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""Return the string representation of the argument action."""
|
||||
return self.value
|
||||
|
||||
|
||||
@dataclass
|
||||
class Argument:
|
||||
"""Represents a command-line argument."""
|
||||
|
||||
flags: list[str]
|
||||
flags: tuple[str, ...]
|
||||
dest: str # Destination name for the argument
|
||||
action: ArgumentAction = (
|
||||
ArgumentAction.STORE
|
||||
@ -38,7 +49,7 @@ class Argument:
|
||||
choices: list[str] | None = None # List of valid choices for the argument
|
||||
required: bool = False # True if the argument is required
|
||||
help: str = "" # Help text for the argument
|
||||
nargs: int | str = 1 # int, '?', '*', '+'
|
||||
nargs: int | str | None = None # int, '?', '*', '+', None
|
||||
positional: bool = False # True if no leading - or -- in flags
|
||||
|
||||
def get_positional_text(self) -> str:
|
||||
@ -66,7 +77,11 @@ class Argument:
|
||||
and not self.positional
|
||||
):
|
||||
choice_text = self.dest.upper()
|
||||
elif isinstance(self.nargs, str):
|
||||
elif self.action in (
|
||||
ArgumentAction.STORE,
|
||||
ArgumentAction.APPEND,
|
||||
ArgumentAction.EXTEND,
|
||||
) or isinstance(self.nargs, str):
|
||||
choice_text = self.dest
|
||||
|
||||
if self.nargs == "?":
|
||||
@ -136,6 +151,7 @@ class CommandArgumentParser:
|
||||
aliases: list[str] | None = None,
|
||||
) -> None:
|
||||
"""Initialize the CommandArgumentParser."""
|
||||
self.console = Console(color_system="auto")
|
||||
self.command_key: str = command_key
|
||||
self.command_description: str = command_description
|
||||
self.command_style: str = command_style
|
||||
@ -148,7 +164,6 @@ class CommandArgumentParser:
|
||||
self._flag_map: dict[str, Argument] = {}
|
||||
self._dest_set: set[str] = set()
|
||||
self._add_help()
|
||||
self.console = Console(color_system="auto")
|
||||
|
||||
def _add_help(self):
|
||||
"""Add help argument to the parser."""
|
||||
@ -170,9 +185,7 @@ class CommandArgumentParser:
|
||||
raise CommandArgumentError("Positional arguments cannot have multiple flags")
|
||||
return positional
|
||||
|
||||
def _get_dest_from_flags(
|
||||
self, flags: tuple[str, ...], dest: str | None
|
||||
) -> str | None:
|
||||
def _get_dest_from_flags(self, flags: tuple[str, ...], dest: str | None) -> str:
|
||||
"""Convert flags to a destination name."""
|
||||
if dest:
|
||||
if not dest.replace("_", "").isalnum():
|
||||
@ -201,7 +214,7 @@ class CommandArgumentParser:
|
||||
return dest
|
||||
|
||||
def _determine_required(
|
||||
self, required: bool, positional: bool, nargs: int | str
|
||||
self, required: bool, positional: bool, nargs: int | str | None
|
||||
) -> bool:
|
||||
"""Determine if the argument is required."""
|
||||
if required:
|
||||
@ -219,7 +232,22 @@ class CommandArgumentParser:
|
||||
|
||||
return required
|
||||
|
||||
def _validate_nargs(self, nargs: int | str) -> int | str:
|
||||
def _validate_nargs(
|
||||
self, nargs: int | str | None, action: ArgumentAction
|
||||
) -> int | str | None:
|
||||
if action in (
|
||||
ArgumentAction.STORE_FALSE,
|
||||
ArgumentAction.STORE_TRUE,
|
||||
ArgumentAction.COUNT,
|
||||
ArgumentAction.HELP,
|
||||
):
|
||||
if nargs is not None:
|
||||
raise CommandArgumentError(
|
||||
f"nargs cannot be specified for {action} actions"
|
||||
)
|
||||
return None
|
||||
if nargs is None:
|
||||
nargs = 1
|
||||
allowed_nargs = ("?", "*", "+")
|
||||
if isinstance(nargs, int):
|
||||
if nargs <= 0:
|
||||
@ -231,7 +259,9 @@ class CommandArgumentParser:
|
||||
raise CommandArgumentError(f"nargs must be an int or one of {allowed_nargs}")
|
||||
return nargs
|
||||
|
||||
def _normalize_choices(self, choices: Iterable, expected_type: Any) -> list[Any]:
|
||||
def _normalize_choices(
|
||||
self, choices: Iterable | None, expected_type: Any
|
||||
) -> list[Any]:
|
||||
if choices is not None:
|
||||
if isinstance(choices, dict):
|
||||
raise CommandArgumentError("choices cannot be a dict")
|
||||
@ -278,8 +308,34 @@ class CommandArgumentParser:
|
||||
f"Default list value {default!r} for '{dest}' cannot be coerced to {expected_type.__name__}"
|
||||
)
|
||||
|
||||
def _validate_action(
|
||||
self, action: ArgumentAction | str, positional: bool
|
||||
) -> ArgumentAction:
|
||||
if not isinstance(action, ArgumentAction):
|
||||
try:
|
||||
action = ArgumentAction(action)
|
||||
except ValueError:
|
||||
raise CommandArgumentError(
|
||||
f"Invalid action '{action}' is not a valid ArgumentAction"
|
||||
)
|
||||
if action in (
|
||||
ArgumentAction.STORE_TRUE,
|
||||
ArgumentAction.STORE_FALSE,
|
||||
ArgumentAction.COUNT,
|
||||
ArgumentAction.HELP,
|
||||
):
|
||||
if positional:
|
||||
raise CommandArgumentError(
|
||||
f"Action '{action}' cannot be used with positional arguments"
|
||||
)
|
||||
|
||||
return action
|
||||
|
||||
def _resolve_default(
|
||||
self, action: ArgumentAction, default: Any, nargs: str | int
|
||||
self,
|
||||
default: Any,
|
||||
action: ArgumentAction,
|
||||
nargs: str | int | None,
|
||||
) -> Any:
|
||||
"""Get the default value for the argument."""
|
||||
if default is None:
|
||||
@ -313,7 +369,18 @@ class CommandArgumentParser:
|
||||
f"Flag '{flag}' must be a single character or start with '--'"
|
||||
)
|
||||
|
||||
def add_argument(self, *flags, **kwargs):
|
||||
def add_argument(
|
||||
self,
|
||||
*flags,
|
||||
action: str | ArgumentAction = "store",
|
||||
nargs: int | str | None = None,
|
||||
default: Any = None,
|
||||
type: Any = str,
|
||||
choices: Iterable | None = None,
|
||||
required: bool = False,
|
||||
help: str = "",
|
||||
dest: str | None = None,
|
||||
) -> None:
|
||||
"""Add an argument to the parser.
|
||||
Args:
|
||||
name or flags: Either a name or prefixed flags (e.g. 'faylx', '-f', '--falyx').
|
||||
@ -326,9 +393,10 @@ class CommandArgumentParser:
|
||||
help: A brief description of the argument.
|
||||
dest: The name of the attribute to be added to the object returned by parse_args().
|
||||
"""
|
||||
expected_type = type
|
||||
self._validate_flags(flags)
|
||||
positional = self._is_positional(flags)
|
||||
dest = self._get_dest_from_flags(flags, kwargs.get("dest"))
|
||||
dest = self._get_dest_from_flags(flags, dest)
|
||||
if dest in self._dest_set:
|
||||
raise CommandArgumentError(
|
||||
f"Destination '{dest}' is already defined.\n"
|
||||
@ -336,18 +404,9 @@ class CommandArgumentParser:
|
||||
"is not supported. Define a unique 'dest' for each argument."
|
||||
)
|
||||
self._dest_set.add(dest)
|
||||
action = kwargs.get("action", ArgumentAction.STORE)
|
||||
if not isinstance(action, ArgumentAction):
|
||||
try:
|
||||
action = ArgumentAction(action)
|
||||
except ValueError:
|
||||
raise CommandArgumentError(
|
||||
f"Invalid action '{action}' is not a valid ArgumentAction"
|
||||
)
|
||||
flags = list(flags)
|
||||
nargs = self._validate_nargs(kwargs.get("nargs", 1))
|
||||
default = self._resolve_default(action, kwargs.get("default"), nargs)
|
||||
expected_type = kwargs.get("type", str)
|
||||
action = self._validate_action(action, positional)
|
||||
nargs = self._validate_nargs(nargs, action)
|
||||
default = self._resolve_default(default, action, nargs)
|
||||
if (
|
||||
action in (ArgumentAction.STORE, ArgumentAction.APPEND, ArgumentAction.EXTEND)
|
||||
and default is not None
|
||||
@ -356,14 +415,12 @@ class CommandArgumentParser:
|
||||
self._validate_default_list_type(default, expected_type, dest)
|
||||
else:
|
||||
self._validate_default_type(default, expected_type, dest)
|
||||
choices = self._normalize_choices(kwargs.get("choices"), expected_type)
|
||||
choices = self._normalize_choices(choices, expected_type)
|
||||
if default is not None and choices and default not in choices:
|
||||
raise CommandArgumentError(
|
||||
f"Default value '{default}' not in allowed choices: {choices}"
|
||||
)
|
||||
required = self._determine_required(
|
||||
kwargs.get("required", False), positional, nargs
|
||||
)
|
||||
required = self._determine_required(required, positional, nargs)
|
||||
argument = Argument(
|
||||
flags=flags,
|
||||
dest=dest,
|
||||
@ -372,7 +429,7 @@ class CommandArgumentParser:
|
||||
default=default,
|
||||
choices=choices,
|
||||
required=required,
|
||||
help=kwargs.get("help", ""),
|
||||
help=help,
|
||||
nargs=nargs,
|
||||
positional=positional,
|
||||
)
|
||||
@ -415,11 +472,11 @@ class CommandArgumentParser:
|
||||
values = []
|
||||
i = start
|
||||
if isinstance(spec.nargs, int):
|
||||
# assert i + spec.nargs <= len(
|
||||
# args
|
||||
# ), "Not enough arguments provided: shouldn't happen"
|
||||
values = args[i : i + spec.nargs]
|
||||
return values, i + spec.nargs
|
||||
elif spec.nargs is None:
|
||||
values = [args[i]]
|
||||
return values, i + 1
|
||||
elif spec.nargs == "+":
|
||||
if i >= len(args):
|
||||
raise CommandArgumentError(
|
||||
@ -464,6 +521,8 @@ class CommandArgumentParser:
|
||||
for next_spec in positional_args[j + 1 :]:
|
||||
if isinstance(next_spec.nargs, int):
|
||||
min_required += next_spec.nargs
|
||||
elif next_spec.nargs is None:
|
||||
min_required += 1
|
||||
elif next_spec.nargs == "+":
|
||||
min_required += 1
|
||||
elif next_spec.nargs == "?":
|
||||
@ -506,7 +565,7 @@ class CommandArgumentParser:
|
||||
|
||||
return i
|
||||
|
||||
def parse_args(
|
||||
async def parse_args(
|
||||
self, args: list[str] | None = None, from_validate: bool = False
|
||||
) -> dict[str, Any]:
|
||||
"""Parse Falyx Command arguments."""
|
||||
@ -654,7 +713,7 @@ class CommandArgumentParser:
|
||||
result.pop("help", None)
|
||||
return result
|
||||
|
||||
def parse_args_split(
|
||||
async def parse_args_split(
|
||||
self, args: list[str], from_validate: bool = False
|
||||
) -> tuple[tuple[Any, ...], dict[str, Any]]:
|
||||
"""
|
||||
@ -662,7 +721,7 @@ class CommandArgumentParser:
|
||||
tuple[args, kwargs] - Positional arguments in defined order,
|
||||
followed by keyword argument mapping.
|
||||
"""
|
||||
parsed = self.parse_args(args, from_validate)
|
||||
parsed = await self.parse_args(args, from_validate)
|
||||
args_list = []
|
||||
kwargs_dict = {}
|
||||
for arg in self._arguments:
|
||||
|
@ -114,7 +114,7 @@ def get_arg_parsers(
|
||||
help="Skip confirmation prompts",
|
||||
)
|
||||
|
||||
run_group.add_argument(
|
||||
run_parser.add_argument(
|
||||
"command_args",
|
||||
nargs=REMAINDER,
|
||||
help="Arguments to pass to the command (if applicable)",
|
||||
|
@ -1,17 +1,20 @@
|
||||
import inspect
|
||||
from typing import Any, Callable
|
||||
|
||||
from falyx import logger
|
||||
from falyx.logger import logger
|
||||
|
||||
|
||||
def infer_args_from_func(
|
||||
func: Callable[[Any], Any],
|
||||
func: Callable[[Any], Any] | None,
|
||||
arg_metadata: dict[str, str | dict[str, Any]] | None = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""
|
||||
Infer argument definitions from a callable's signature.
|
||||
Returns a list of kwargs suitable for CommandArgumentParser.add_argument.
|
||||
"""
|
||||
if not callable(func):
|
||||
logger.debug("Provided argument is not callable: %s", func)
|
||||
return []
|
||||
arg_metadata = arg_metadata or {}
|
||||
signature = inspect.signature(func)
|
||||
arg_defs = []
|
||||
@ -39,7 +42,7 @@ def infer_args_from_func(
|
||||
else:
|
||||
flags = [f"--{name.replace('_', '-')}"]
|
||||
action = "store"
|
||||
nargs: int | str = 1
|
||||
nargs: int | str | None = None
|
||||
|
||||
if arg_type is bool:
|
||||
if param.default is False:
|
||||
|
@ -1,7 +1,6 @@
|
||||
from typing import Any
|
||||
|
||||
from falyx import logger
|
||||
from falyx.action.action import Action, ChainedAction, ProcessAction
|
||||
from falyx.parsers.signature import infer_args_from_func
|
||||
|
||||
|
||||
@ -9,17 +8,13 @@ def same_argument_definitions(
|
||||
actions: list[Any],
|
||||
arg_metadata: dict[str, str | dict[str, Any]] | None = None,
|
||||
) -> list[dict[str, Any]] | None:
|
||||
from falyx.action.action import BaseAction
|
||||
|
||||
arg_sets = []
|
||||
for action in actions:
|
||||
if isinstance(action, (Action, ProcessAction)):
|
||||
arg_defs = infer_args_from_func(action.action, arg_metadata)
|
||||
elif isinstance(action, ChainedAction):
|
||||
if action.actions:
|
||||
action = action.actions[0]
|
||||
if isinstance(action, Action):
|
||||
arg_defs = infer_args_from_func(action.action, arg_metadata)
|
||||
elif callable(action):
|
||||
arg_defs = infer_args_from_func(action, arg_metadata)
|
||||
if isinstance(action, BaseAction):
|
||||
infer_target, _ = action.get_infer_target()
|
||||
arg_defs = infer_args_from_func(infer_target, arg_metadata)
|
||||
elif callable(action):
|
||||
arg_defs = infer_args_from_func(action, arg_metadata)
|
||||
else:
|
||||
|
@ -10,7 +10,7 @@ from rich.markup import escape
|
||||
from rich.table import Table
|
||||
|
||||
from falyx.themes import OneColors
|
||||
from falyx.utils import chunks
|
||||
from falyx.utils import CaseInsensitiveDict, chunks
|
||||
from falyx.validators import int_range_validator, key_validator
|
||||
|
||||
|
||||
@ -32,6 +32,62 @@ class SelectionOption:
|
||||
return f"[{OneColors.WHITE}]{key}[/] [{self.style}]{self.description}[/]"
|
||||
|
||||
|
||||
class SelectionOptionMap(CaseInsensitiveDict):
|
||||
"""
|
||||
Manages selection options including validation and reserved key protection.
|
||||
"""
|
||||
|
||||
RESERVED_KEYS: set[str] = set()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
options: dict[str, SelectionOption] | None = None,
|
||||
allow_reserved: bool = False,
|
||||
):
|
||||
super().__init__()
|
||||
self.allow_reserved = allow_reserved
|
||||
if options:
|
||||
self.update(options)
|
||||
|
||||
def _add_reserved(self, key: str, option: SelectionOption) -> None:
|
||||
"""Add a reserved key, bypassing validation."""
|
||||
norm_key = key.upper()
|
||||
super().__setitem__(norm_key, option)
|
||||
|
||||
def __setitem__(self, key: str, option: SelectionOption) -> None:
|
||||
if not isinstance(option, SelectionOption):
|
||||
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
|
||||
norm_key = key.upper()
|
||||
if norm_key in self.RESERVED_KEYS and not self.allow_reserved:
|
||||
raise ValueError(
|
||||
f"Key '{key}' is reserved and cannot be used in SelectionOptionMap."
|
||||
)
|
||||
super().__setitem__(norm_key, option)
|
||||
|
||||
def __delitem__(self, key: str) -> None:
|
||||
if key.upper() in self.RESERVED_KEYS and not self.allow_reserved:
|
||||
raise ValueError(f"Cannot delete reserved option '{key}'.")
|
||||
super().__delitem__(key)
|
||||
|
||||
def update(self, other=None, **kwargs):
|
||||
"""Update the selection options with another dictionary."""
|
||||
if other:
|
||||
for key, option in other.items():
|
||||
if not isinstance(option, SelectionOption):
|
||||
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
|
||||
self[key] = option
|
||||
for key, option in kwargs.items():
|
||||
if not isinstance(option, SelectionOption):
|
||||
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
|
||||
self[key] = option
|
||||
|
||||
def items(self, include_reserved: bool = True):
|
||||
for k, v in super().items():
|
||||
if not include_reserved and k in self.RESERVED_KEYS:
|
||||
continue
|
||||
yield k, v
|
||||
|
||||
|
||||
def render_table_base(
|
||||
title: str,
|
||||
*,
|
||||
@ -215,7 +271,7 @@ async def prompt_for_index(
|
||||
prompt_session: PromptSession | None = None,
|
||||
prompt_message: str = "Select an option > ",
|
||||
show_table: bool = True,
|
||||
):
|
||||
) -> int:
|
||||
prompt_session = prompt_session or PromptSession()
|
||||
console = console or Console(color_system="auto")
|
||||
|
||||
|
@ -1 +1 @@
|
||||
__version__ = "0.1.29"
|
||||
__version__ = "0.1.35"
|
||||
|
@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "falyx"
|
||||
version = "0.1.29"
|
||||
version = "0.1.35"
|
||||
description = "Reliable and introspectable async CLI action framework."
|
||||
authors = ["Roland Thomas Jr <roland@rtj.dev>"]
|
||||
license = "MIT"
|
||||
|
@ -1,7 +1,7 @@
|
||||
# test_command.py
|
||||
import pytest
|
||||
|
||||
from falyx.action import Action, ActionGroup, BaseIOAction, ChainedAction
|
||||
from falyx.action import Action, BaseIOAction, ChainedAction
|
||||
from falyx.command import Command
|
||||
from falyx.execution_registry import ExecutionRegistry as er
|
||||
from falyx.retry import RetryPolicy
|
||||
@ -56,102 +56,6 @@ def test_command_str():
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"action_factory, expected_requires_input",
|
||||
[
|
||||
(lambda: Action(name="normal", action=dummy_action), False),
|
||||
(lambda: DummyInputAction(name="io"), True),
|
||||
(
|
||||
lambda: ChainedAction(name="chain", actions=[DummyInputAction(name="io")]),
|
||||
True,
|
||||
),
|
||||
(
|
||||
lambda: ActionGroup(name="group", actions=[DummyInputAction(name="io")]),
|
||||
True,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_command_requires_input_detection(action_factory, expected_requires_input):
|
||||
action = action_factory()
|
||||
cmd = Command(key="TEST", description="Test Command", action=action)
|
||||
|
||||
assert cmd.requires_input == expected_requires_input
|
||||
if expected_requires_input:
|
||||
assert cmd.hidden is True
|
||||
else:
|
||||
assert cmd.hidden is False
|
||||
|
||||
|
||||
def test_requires_input_flag_detected_for_baseioaction():
|
||||
"""Command should automatically detect requires_input=True for BaseIOAction."""
|
||||
cmd = Command(
|
||||
key="X",
|
||||
description="Echo input",
|
||||
action=DummyInputAction(name="dummy"),
|
||||
)
|
||||
assert cmd.requires_input is True
|
||||
assert cmd.hidden is True
|
||||
|
||||
|
||||
def test_requires_input_manual_override():
|
||||
"""Command manually set requires_input=False should not auto-hide."""
|
||||
cmd = Command(
|
||||
key="Y",
|
||||
description="Custom input command",
|
||||
action=DummyInputAction(name="dummy"),
|
||||
requires_input=False,
|
||||
)
|
||||
assert cmd.requires_input is False
|
||||
assert cmd.hidden is False
|
||||
|
||||
|
||||
def test_default_command_does_not_require_input():
|
||||
"""Normal Command without IO Action should not require input."""
|
||||
cmd = Command(
|
||||
key="Z",
|
||||
description="Simple action",
|
||||
action=lambda: 42,
|
||||
)
|
||||
assert cmd.requires_input is False
|
||||
assert cmd.hidden is False
|
||||
|
||||
|
||||
def test_chain_requires_input():
|
||||
"""If first action in a chain requires input, the command should require input."""
|
||||
chain = ChainedAction(
|
||||
name="ChainWithInput",
|
||||
actions=[
|
||||
DummyInputAction(name="dummy"),
|
||||
Action(name="action1", action=lambda: 1),
|
||||
],
|
||||
)
|
||||
cmd = Command(
|
||||
key="A",
|
||||
description="Chain with input",
|
||||
action=chain,
|
||||
)
|
||||
assert cmd.requires_input is True
|
||||
assert cmd.hidden is True
|
||||
|
||||
|
||||
def test_group_requires_input():
|
||||
"""If any action in a group requires input, the command should require input."""
|
||||
group = ActionGroup(
|
||||
name="GroupWithInput",
|
||||
actions=[
|
||||
Action(name="action1", action=lambda: 1),
|
||||
DummyInputAction(name="dummy"),
|
||||
],
|
||||
)
|
||||
cmd = Command(
|
||||
key="B",
|
||||
description="Group with input",
|
||||
action=group,
|
||||
)
|
||||
assert cmd.requires_input is True
|
||||
assert cmd.hidden is True
|
||||
|
||||
|
||||
def test_enable_retry():
|
||||
"""Command should enable retry if action is an Action and retry is set to True."""
|
||||
cmd = Command(
|
||||
|
@ -5,98 +5,109 @@ from falyx.parsers import ArgumentAction, CommandArgumentParser
|
||||
from falyx.signals import HelpSignal
|
||||
|
||||
|
||||
def build_parser_and_parse(args, config):
|
||||
async def build_parser_and_parse(args, config):
|
||||
cap = CommandArgumentParser()
|
||||
config(cap)
|
||||
return cap.parse_args(args)
|
||||
return await cap.parse_args(args)
|
||||
|
||||
|
||||
def test_none():
|
||||
@pytest.mark.asyncio
|
||||
async def test_none():
|
||||
def config(parser):
|
||||
parser.add_argument("--foo", type=str)
|
||||
|
||||
parsed = build_parser_and_parse(None, config)
|
||||
parsed = await build_parser_and_parse(None, config)
|
||||
assert parsed["foo"] is None
|
||||
|
||||
|
||||
def test_append_multiple_flags():
|
||||
@pytest.mark.asyncio
|
||||
async def test_append_multiple_flags():
|
||||
def config(parser):
|
||||
parser.add_argument("--tag", action=ArgumentAction.APPEND, type=str)
|
||||
|
||||
parsed = build_parser_and_parse(["--tag", "a", "--tag", "b", "--tag", "c"], config)
|
||||
parsed = await build_parser_and_parse(
|
||||
["--tag", "a", "--tag", "b", "--tag", "c"], config
|
||||
)
|
||||
assert parsed["tag"] == ["a", "b", "c"]
|
||||
|
||||
|
||||
def test_positional_nargs_plus_and_single():
|
||||
@pytest.mark.asyncio
|
||||
async def test_positional_nargs_plus_and_single():
|
||||
def config(parser):
|
||||
parser.add_argument("files", nargs="+", type=str)
|
||||
parser.add_argument("mode", nargs=1)
|
||||
|
||||
parsed = build_parser_and_parse(["a", "b", "c", "prod"], config)
|
||||
parsed = await build_parser_and_parse(["a", "b", "c", "prod"], config)
|
||||
assert parsed["files"] == ["a", "b", "c"]
|
||||
assert parsed["mode"] == "prod"
|
||||
|
||||
|
||||
def test_type_validation_failure():
|
||||
@pytest.mark.asyncio
|
||||
async def test_type_validation_failure():
|
||||
def config(parser):
|
||||
parser.add_argument("--count", type=int)
|
||||
|
||||
with pytest.raises(CommandArgumentError):
|
||||
build_parser_and_parse(["--count", "abc"], config)
|
||||
await build_parser_and_parse(["--count", "abc"], config)
|
||||
|
||||
|
||||
def test_required_field_missing():
|
||||
@pytest.mark.asyncio
|
||||
async def test_required_field_missing():
|
||||
def config(parser):
|
||||
parser.add_argument("--env", type=str, required=True)
|
||||
|
||||
with pytest.raises(CommandArgumentError):
|
||||
build_parser_and_parse([], config)
|
||||
await build_parser_and_parse([], config)
|
||||
|
||||
|
||||
def test_choices_enforced():
|
||||
@pytest.mark.asyncio
|
||||
async def test_choices_enforced():
|
||||
def config(parser):
|
||||
parser.add_argument("--mode", choices=["dev", "prod"])
|
||||
|
||||
with pytest.raises(CommandArgumentError):
|
||||
build_parser_and_parse(["--mode", "staging"], config)
|
||||
await build_parser_and_parse(["--mode", "staging"], config)
|
||||
|
||||
|
||||
def test_boolean_flags():
|
||||
@pytest.mark.asyncio
|
||||
async def test_boolean_flags():
|
||||
def config(parser):
|
||||
parser.add_argument("--debug", action=ArgumentAction.STORE_TRUE)
|
||||
parser.add_argument("--no-debug", action=ArgumentAction.STORE_FALSE)
|
||||
|
||||
parsed = build_parser_and_parse(["--debug", "--no-debug"], config)
|
||||
parsed = await build_parser_and_parse(["--debug", "--no-debug"], config)
|
||||
assert parsed["debug"] is True
|
||||
assert parsed["no_debug"] is False
|
||||
parsed = build_parser_and_parse([], config)
|
||||
print(parsed)
|
||||
parsed = await build_parser_and_parse([], config)
|
||||
assert parsed["debug"] is False
|
||||
assert parsed["no_debug"] is True
|
||||
|
||||
|
||||
def test_count_action():
|
||||
@pytest.mark.asyncio
|
||||
async def test_count_action():
|
||||
def config(parser):
|
||||
parser.add_argument("-v", action=ArgumentAction.COUNT)
|
||||
|
||||
parsed = build_parser_and_parse(["-v", "-v", "-v"], config)
|
||||
parsed = await build_parser_and_parse(["-v", "-v", "-v"], config)
|
||||
assert parsed["v"] == 3
|
||||
|
||||
|
||||
def test_nargs_star():
|
||||
@pytest.mark.asyncio
|
||||
async def test_nargs_star():
|
||||
def config(parser):
|
||||
parser.add_argument("args", nargs="*", type=str)
|
||||
|
||||
parsed = build_parser_and_parse(["one", "two", "three"], config)
|
||||
parsed = await build_parser_and_parse(["one", "two", "three"], config)
|
||||
assert parsed["args"] == ["one", "two", "three"]
|
||||
|
||||
|
||||
def test_flag_and_positional_mix():
|
||||
@pytest.mark.asyncio
|
||||
async def test_flag_and_positional_mix():
|
||||
def config(parser):
|
||||
parser.add_argument("--env", type=str)
|
||||
parser.add_argument("tasks", nargs="+")
|
||||
|
||||
parsed = build_parser_and_parse(["--env", "prod", "build", "test"], config)
|
||||
parsed = await build_parser_and_parse(["--env", "prod", "build", "test"], config)
|
||||
assert parsed["env"] == "prod"
|
||||
assert parsed["tasks"] == ["build", "test"]
|
||||
|
||||
@ -134,7 +145,7 @@ def test_add_argument_multiple_optional_flags_same_dest():
|
||||
parser.add_argument("-f", "--falyx")
|
||||
arg = parser._arguments[-1]
|
||||
assert arg.dest == "falyx"
|
||||
assert arg.flags == ["-f", "--falyx"]
|
||||
assert arg.flags == ("-f", "--falyx")
|
||||
|
||||
|
||||
def test_add_argument_flag_dest_conflict():
|
||||
@ -165,7 +176,7 @@ def test_add_argument_multiple_flags_custom_dest():
|
||||
parser.add_argument("-f", "--falyx", "--test", dest="falyx")
|
||||
arg = parser._arguments[-1]
|
||||
assert arg.dest == "falyx"
|
||||
assert arg.flags == ["-f", "--falyx", "--test"]
|
||||
assert arg.flags == ("-f", "--falyx", "--test")
|
||||
|
||||
|
||||
def test_add_argument_multiple_flags_dest():
|
||||
@ -175,7 +186,7 @@ def test_add_argument_multiple_flags_dest():
|
||||
parser.add_argument("-f", "--falyx", "--test")
|
||||
arg = parser._arguments[-1]
|
||||
assert arg.dest == "falyx"
|
||||
assert arg.flags == ["-f", "--falyx", "--test"]
|
||||
assert arg.flags == ("-f", "--falyx", "--test")
|
||||
|
||||
|
||||
def test_add_argument_single_flag_dest():
|
||||
@ -185,7 +196,7 @@ def test_add_argument_single_flag_dest():
|
||||
parser.add_argument("-f")
|
||||
arg = parser._arguments[-1]
|
||||
assert arg.dest == "f"
|
||||
assert arg.flags == ["-f"]
|
||||
assert arg.flags == ("-f",)
|
||||
|
||||
|
||||
def test_add_argument_bad_dest():
|
||||
@ -257,7 +268,7 @@ def test_add_argument_default_value():
|
||||
parser.add_argument("--falyx", default="default_value")
|
||||
arg = parser._arguments[-1]
|
||||
assert arg.dest == "falyx"
|
||||
assert arg.flags == ["--falyx"]
|
||||
assert arg.flags == ("--falyx",)
|
||||
assert arg.default == "default_value"
|
||||
|
||||
|
||||
@ -297,20 +308,21 @@ def test_add_argument_default_not_in_choices():
|
||||
parser.add_argument("--falyx", choices=["a", "b"], default="c")
|
||||
|
||||
|
||||
def test_add_argument_choices():
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_argument_choices():
|
||||
parser = CommandArgumentParser()
|
||||
|
||||
# ✅ Choices provided
|
||||
parser.add_argument("--falyx", choices=["a", "b", "c"])
|
||||
arg = parser._arguments[-1]
|
||||
assert arg.dest == "falyx"
|
||||
assert arg.flags == ["--falyx"]
|
||||
assert arg.flags == ("--falyx",)
|
||||
assert arg.choices == ["a", "b", "c"]
|
||||
|
||||
args = parser.parse_args(["--falyx", "a"])
|
||||
args = await parser.parse_args(["--falyx", "a"])
|
||||
assert args["falyx"] == "a"
|
||||
with pytest.raises(CommandArgumentError):
|
||||
parser.parse_args(["--falyx", "d"])
|
||||
await parser.parse_args(["--falyx", "d"])
|
||||
|
||||
|
||||
def test_add_argument_choices_invalid():
|
||||
@ -352,7 +364,7 @@ def test_add_argument_nargs():
|
||||
parser.add_argument("--falyx", nargs=2)
|
||||
arg = parser._arguments[-1]
|
||||
assert arg.dest == "falyx"
|
||||
assert arg.flags == ["--falyx"]
|
||||
assert arg.flags == ("--falyx",)
|
||||
assert arg.nargs == 2
|
||||
|
||||
|
||||
@ -377,56 +389,60 @@ def test_get_argument():
|
||||
parser.add_argument("--falyx", type=str, default="default_value")
|
||||
arg = parser.get_argument("falyx")
|
||||
assert arg.dest == "falyx"
|
||||
assert arg.flags == ["--falyx"]
|
||||
assert arg.flags == ("--falyx",)
|
||||
assert arg.default == "default_value"
|
||||
|
||||
|
||||
def test_parse_args_nargs():
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_args_nargs():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("files", nargs="+", type=str)
|
||||
parser.add_argument("mode", nargs=1)
|
||||
|
||||
args = parser.parse_args(["a", "b", "c"])
|
||||
args = await parser.parse_args(["a", "b", "c"])
|
||||
|
||||
assert args["files"] == ["a", "b"]
|
||||
assert args["mode"] == "c"
|
||||
|
||||
|
||||
def test_parse_args_nargs_plus():
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_args_nargs_plus():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("files", nargs="+", type=str)
|
||||
|
||||
args = parser.parse_args(["a", "b", "c"])
|
||||
args = await parser.parse_args(["a", "b", "c"])
|
||||
assert args["files"] == ["a", "b", "c"]
|
||||
|
||||
args = parser.parse_args(["a"])
|
||||
args = await parser.parse_args(["a"])
|
||||
assert args["files"] == ["a"]
|
||||
|
||||
|
||||
def test_parse_args_flagged_nargs_plus():
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_args_flagged_nargs_plus():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("--files", nargs="+", type=str)
|
||||
|
||||
args = parser.parse_args(["--files", "a", "b", "c"])
|
||||
args = await parser.parse_args(["--files", "a", "b", "c"])
|
||||
assert args["files"] == ["a", "b", "c"]
|
||||
|
||||
args = parser.parse_args(["--files", "a"])
|
||||
args = await parser.parse_args(["--files", "a"])
|
||||
print(args)
|
||||
assert args["files"] == ["a"]
|
||||
|
||||
args = parser.parse_args([])
|
||||
args = await parser.parse_args([])
|
||||
assert args["files"] == []
|
||||
|
||||
|
||||
def test_parse_args_numbered_nargs():
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_args_numbered_nargs():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("files", nargs=2, type=str)
|
||||
|
||||
args = parser.parse_args(["a", "b"])
|
||||
args = await parser.parse_args(["a", "b"])
|
||||
assert args["files"] == ["a", "b"]
|
||||
|
||||
with pytest.raises(CommandArgumentError):
|
||||
args = parser.parse_args(["a"])
|
||||
args = await parser.parse_args(["a"])
|
||||
print(args)
|
||||
|
||||
|
||||
@ -436,48 +452,53 @@ def test_parse_args_nargs_zero():
|
||||
parser.add_argument("files", nargs=0, type=str)
|
||||
|
||||
|
||||
def test_parse_args_nargs_more_than_expected():
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_args_nargs_more_than_expected():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("files", nargs=2, type=str)
|
||||
|
||||
with pytest.raises(CommandArgumentError):
|
||||
parser.parse_args(["a", "b", "c", "d"])
|
||||
await parser.parse_args(["a", "b", "c", "d"])
|
||||
|
||||
|
||||
def test_parse_args_nargs_one_or_none():
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_args_nargs_one_or_none():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("files", nargs="?", type=str)
|
||||
|
||||
args = parser.parse_args(["a"])
|
||||
args = await parser.parse_args(["a"])
|
||||
assert args["files"] == "a"
|
||||
|
||||
args = parser.parse_args([])
|
||||
args = await parser.parse_args([])
|
||||
assert args["files"] is None
|
||||
|
||||
|
||||
def test_parse_args_nargs_positional():
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_args_nargs_positional():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("files", nargs="*", type=str)
|
||||
|
||||
args = parser.parse_args(["a", "b", "c"])
|
||||
args = await parser.parse_args(["a", "b", "c"])
|
||||
assert args["files"] == ["a", "b", "c"]
|
||||
|
||||
args = parser.parse_args([])
|
||||
args = await parser.parse_args([])
|
||||
assert args["files"] == []
|
||||
|
||||
|
||||
def test_parse_args_nargs_positional_plus():
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_args_nargs_positional_plus():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("files", nargs="+", type=str)
|
||||
|
||||
args = parser.parse_args(["a", "b", "c"])
|
||||
args = await parser.parse_args(["a", "b", "c"])
|
||||
assert args["files"] == ["a", "b", "c"]
|
||||
|
||||
with pytest.raises(CommandArgumentError):
|
||||
args = parser.parse_args([])
|
||||
args = await parser.parse_args([])
|
||||
|
||||
|
||||
def test_parse_args_nargs_multiple_positional():
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_args_nargs_multiple_positional():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("files", nargs="+", type=str)
|
||||
parser.add_argument("mode", nargs=1)
|
||||
@ -485,7 +506,7 @@ def test_parse_args_nargs_multiple_positional():
|
||||
parser.add_argument("target", nargs="*")
|
||||
parser.add_argument("extra", nargs="+")
|
||||
|
||||
args = parser.parse_args(["a", "b", "c", "d", "e"])
|
||||
args = await parser.parse_args(["a", "b", "c", "d", "e"])
|
||||
assert args["files"] == ["a", "b", "c"]
|
||||
assert args["mode"] == "d"
|
||||
assert args["action"] == []
|
||||
@ -493,186 +514,209 @@ def test_parse_args_nargs_multiple_positional():
|
||||
assert args["extra"] == ["e"]
|
||||
|
||||
with pytest.raises(CommandArgumentError):
|
||||
parser.parse_args([])
|
||||
await parser.parse_args([])
|
||||
|
||||
|
||||
def test_parse_args_nargs_invalid_positional_arguments():
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_args_nargs_invalid_positional_arguments():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("numbers", nargs="*", type=int)
|
||||
parser.add_argument("mode", nargs=1)
|
||||
|
||||
with pytest.raises(CommandArgumentError):
|
||||
parser.parse_args(["1", "2", "c", "d"])
|
||||
await parser.parse_args(["1", "2", "c", "d"])
|
||||
|
||||
|
||||
def test_parse_args_append():
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_args_append():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("--numbers", action=ArgumentAction.APPEND, type=int)
|
||||
|
||||
args = parser.parse_args(["--numbers", "1", "--numbers", "2", "--numbers", "3"])
|
||||
args = await parser.parse_args(["--numbers", "1", "--numbers", "2", "--numbers", "3"])
|
||||
assert args["numbers"] == [1, 2, 3]
|
||||
|
||||
args = parser.parse_args(["--numbers", "1"])
|
||||
args = await parser.parse_args(["--numbers", "1"])
|
||||
assert args["numbers"] == [1]
|
||||
|
||||
args = parser.parse_args([])
|
||||
args = await parser.parse_args([])
|
||||
assert args["numbers"] == []
|
||||
|
||||
|
||||
def test_parse_args_nargs_append():
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_args_nargs_append():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("numbers", action=ArgumentAction.APPEND, type=int, nargs="*")
|
||||
parser.add_argument("--mode")
|
||||
|
||||
args = parser.parse_args(["1", "2", "3", "--mode", "numbers", "4", "5"])
|
||||
args = await parser.parse_args(["1", "2", "3", "--mode", "numbers", "4", "5"])
|
||||
assert args["numbers"] == [[1, 2, 3], [4, 5]]
|
||||
|
||||
args = parser.parse_args(["1"])
|
||||
args = await parser.parse_args(["1"])
|
||||
assert args["numbers"] == [[1]]
|
||||
|
||||
args = parser.parse_args([])
|
||||
args = await parser.parse_args([])
|
||||
assert args["numbers"] == []
|
||||
|
||||
|
||||
def test_parse_args_append_flagged_invalid_type():
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_args_append_flagged_invalid_type():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("--numbers", action=ArgumentAction.APPEND, type=int)
|
||||
|
||||
with pytest.raises(CommandArgumentError):
|
||||
parser.parse_args(["--numbers", "a"])
|
||||
await parser.parse_args(["--numbers", "a"])
|
||||
|
||||
|
||||
def test_append_groups_nargs():
|
||||
@pytest.mark.asyncio
|
||||
async def test_append_groups_nargs():
|
||||
cap = CommandArgumentParser()
|
||||
cap.add_argument("--item", action=ArgumentAction.APPEND, type=str, nargs=2)
|
||||
|
||||
parsed = cap.parse_args(["--item", "a", "b", "--item", "c", "d"])
|
||||
parsed = await cap.parse_args(["--item", "a", "b", "--item", "c", "d"])
|
||||
assert parsed["item"] == [["a", "b"], ["c", "d"]]
|
||||
|
||||
|
||||
def test_extend_flattened():
|
||||
@pytest.mark.asyncio
|
||||
async def test_extend_flattened():
|
||||
cap = CommandArgumentParser()
|
||||
cap.add_argument("--value", action=ArgumentAction.EXTEND, type=str)
|
||||
|
||||
parsed = cap.parse_args(["--value", "x", "--value", "y"])
|
||||
parsed = await cap.parse_args(["--value", "x", "--value", "y"])
|
||||
assert parsed["value"] == ["x", "y"]
|
||||
|
||||
|
||||
def test_parse_args_split_order():
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_args_split_order():
|
||||
cap = CommandArgumentParser()
|
||||
cap.add_argument("a")
|
||||
cap.add_argument("--x")
|
||||
cap.add_argument("b", nargs="*")
|
||||
args, kwargs = cap.parse_args_split(["1", "--x", "100", "2"])
|
||||
args, kwargs = await cap.parse_args_split(["1", "--x", "100", "2"])
|
||||
assert args == ("1", ["2"])
|
||||
assert kwargs == {"x": "100"}
|
||||
|
||||
|
||||
def test_help_signal_triggers():
|
||||
@pytest.mark.asyncio
|
||||
async def test_help_signal_triggers():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("--foo")
|
||||
with pytest.raises(HelpSignal):
|
||||
parser.parse_args(["--help"])
|
||||
await parser.parse_args(["--help"])
|
||||
|
||||
|
||||
def test_empty_parser_defaults():
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_parser_defaults():
|
||||
parser = CommandArgumentParser()
|
||||
with pytest.raises(HelpSignal):
|
||||
parser.parse_args(["--help"])
|
||||
await parser.parse_args(["--help"])
|
||||
|
||||
|
||||
def test_extend_basic():
|
||||
@pytest.mark.asyncio
|
||||
async def test_extend_basic():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("--tag", action=ArgumentAction.EXTEND, type=str)
|
||||
|
||||
args = parser.parse_args(["--tag", "a", "--tag", "b", "--tag", "c"])
|
||||
args = await parser.parse_args(["--tag", "a", "--tag", "b", "--tag", "c"])
|
||||
assert args["tag"] == ["a", "b", "c"]
|
||||
|
||||
|
||||
def test_extend_nargs_2():
|
||||
@pytest.mark.asyncio
|
||||
async def test_extend_nargs_2():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("--pair", action=ArgumentAction.EXTEND, type=str, nargs=2)
|
||||
|
||||
args = parser.parse_args(["--pair", "a", "b", "--pair", "c", "d"])
|
||||
args = await parser.parse_args(["--pair", "a", "b", "--pair", "c", "d"])
|
||||
assert args["pair"] == ["a", "b", "c", "d"]
|
||||
|
||||
|
||||
def test_extend_nargs_star():
|
||||
@pytest.mark.asyncio
|
||||
async def test_extend_nargs_star():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("--files", action=ArgumentAction.EXTEND, type=str, nargs="*")
|
||||
|
||||
args = parser.parse_args(["--files", "x", "y", "z"])
|
||||
args = await parser.parse_args(["--files", "x", "y", "z"])
|
||||
assert args["files"] == ["x", "y", "z"]
|
||||
|
||||
args = parser.parse_args(["--files"])
|
||||
args = await parser.parse_args(["--files"])
|
||||
assert args["files"] == []
|
||||
|
||||
|
||||
def test_extend_nargs_plus():
|
||||
@pytest.mark.asyncio
|
||||
async def test_extend_nargs_plus():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("--inputs", action=ArgumentAction.EXTEND, type=int, nargs="+")
|
||||
|
||||
args = parser.parse_args(["--inputs", "1", "2", "3", "--inputs", "4"])
|
||||
args = await parser.parse_args(["--inputs", "1", "2", "3", "--inputs", "4"])
|
||||
assert args["inputs"] == [1, 2, 3, 4]
|
||||
|
||||
|
||||
def test_extend_invalid_type():
|
||||
@pytest.mark.asyncio
|
||||
async def test_extend_invalid_type():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("--nums", action=ArgumentAction.EXTEND, type=int)
|
||||
|
||||
with pytest.raises(CommandArgumentError):
|
||||
parser.parse_args(["--nums", "a"])
|
||||
await parser.parse_args(["--nums", "a"])
|
||||
|
||||
|
||||
def test_greedy_invalid_type():
|
||||
@pytest.mark.asyncio
|
||||
async def test_greedy_invalid_type():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("--nums", nargs="*", type=int)
|
||||
with pytest.raises(CommandArgumentError):
|
||||
parser.parse_args(["--nums", "a"])
|
||||
await parser.parse_args(["--nums", "a"])
|
||||
|
||||
|
||||
def test_append_vs_extend_behavior():
|
||||
@pytest.mark.asyncio
|
||||
async def test_append_vs_extend_behavior():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("--x", action=ArgumentAction.APPEND, nargs=2)
|
||||
parser.add_argument("--y", action=ArgumentAction.EXTEND, nargs=2)
|
||||
|
||||
args = parser.parse_args(
|
||||
args = await parser.parse_args(
|
||||
["--x", "a", "b", "--x", "c", "d", "--y", "1", "2", "--y", "3", "4"]
|
||||
)
|
||||
assert args["x"] == [["a", "b"], ["c", "d"]]
|
||||
assert args["y"] == ["1", "2", "3", "4"]
|
||||
|
||||
|
||||
def test_append_vs_extend_behavior_error():
|
||||
@pytest.mark.asyncio
|
||||
async def test_append_vs_extend_behavior_error():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("--x", action=ArgumentAction.APPEND, nargs=2)
|
||||
parser.add_argument("--y", action=ArgumentAction.EXTEND, nargs=2)
|
||||
|
||||
# This should raise an error because the last argument is not a valid pair
|
||||
with pytest.raises(CommandArgumentError):
|
||||
parser.parse_args(["--x", "a", "b", "--x", "c", "d", "--y", "1", "2", "--y", "3"])
|
||||
await parser.parse_args(
|
||||
["--x", "a", "b", "--x", "c", "d", "--y", "1", "2", "--y", "3"]
|
||||
)
|
||||
|
||||
with pytest.raises(CommandArgumentError):
|
||||
parser.parse_args(["--x", "a", "b", "--x", "c", "--y", "1", "--y", "3", "4"])
|
||||
await parser.parse_args(
|
||||
["--x", "a", "b", "--x", "c", "--y", "1", "--y", "3", "4"]
|
||||
)
|
||||
|
||||
|
||||
def test_extend_positional():
|
||||
@pytest.mark.asyncio
|
||||
async def test_extend_positional():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("files", action=ArgumentAction.EXTEND, type=str, nargs="*")
|
||||
|
||||
args = parser.parse_args(["a", "b", "c"])
|
||||
args = await parser.parse_args(["a", "b", "c"])
|
||||
assert args["files"] == ["a", "b", "c"]
|
||||
|
||||
args = parser.parse_args([])
|
||||
args = await parser.parse_args([])
|
||||
assert args["files"] == []
|
||||
|
||||
|
||||
def test_extend_positional_nargs():
|
||||
@pytest.mark.asyncio
|
||||
async def test_extend_positional_nargs():
|
||||
parser = CommandArgumentParser()
|
||||
parser.add_argument("files", action=ArgumentAction.EXTEND, type=str, nargs="+")
|
||||
|
||||
args = parser.parse_args(["a", "b", "c"])
|
||||
args = await parser.parse_args(["a", "b", "c"])
|
||||
assert args["files"] == ["a", "b", "c"]
|
||||
|
||||
with pytest.raises(CommandArgumentError):
|
||||
parser.parse_args([])
|
||||
await parser.parse_args([])
|
||||
|
Reference in New Issue
Block a user