Compare commits
7 Commits
command-ar
...
b0c0e7dc16
Author | SHA1 | Date | |
---|---|---|---|
b0c0e7dc16
|
|||
0a1ba22a3d
|
|||
b51ba87999
|
|||
3c0a81359c
|
|||
4fa6e3bf1f | |||
afa47b0bac
|
|||
70a527358d |
39
examples/auto_args_group.py
Normal file
39
examples/auto_args_group.py
Normal file
@ -0,0 +1,39 @@
|
||||
import asyncio
|
||||
|
||||
from falyx import Action, ActionGroup, Command, Falyx
|
||||
|
||||
|
||||
# Define a shared async function
|
||||
async def say_hello(name: str, excited: bool = False):
|
||||
if excited:
|
||||
print(f"Hello, {name}!!!")
|
||||
else:
|
||||
print(f"Hello, {name}.")
|
||||
|
||||
|
||||
# Wrap the same callable in multiple Actions
|
||||
action1 = Action("say_hello_1", action=say_hello)
|
||||
action2 = Action("say_hello_2", action=say_hello)
|
||||
action3 = Action("say_hello_3", action=say_hello)
|
||||
|
||||
# Combine into an ActionGroup
|
||||
group = ActionGroup(name="greet_group", actions=[action1, action2, action3])
|
||||
|
||||
# Create the Command with auto_args=True
|
||||
cmd = Command(
|
||||
key="G",
|
||||
description="Greet someone with multiple variations.",
|
||||
action=group,
|
||||
arg_metadata={
|
||||
"name": {
|
||||
"help": "The name of the person to greet.",
|
||||
},
|
||||
"excited": {
|
||||
"help": "Whether to greet excitedly.",
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
flx = Falyx("Test Group")
|
||||
flx.add_command_from_command(cmd)
|
||||
asyncio.run(flx.run())
|
54
examples/auto_parse_demo.py
Normal file
54
examples/auto_parse_demo.py
Normal file
@ -0,0 +1,54 @@
|
||||
import asyncio
|
||||
|
||||
from falyx import Action, ChainedAction, Falyx
|
||||
from falyx.utils import setup_logging
|
||||
|
||||
setup_logging()
|
||||
|
||||
|
||||
async def deploy(service: str, region: str = "us-east-1", verbose: bool = False) -> str:
|
||||
if verbose:
|
||||
print(f"Deploying {service} to {region}...")
|
||||
await asyncio.sleep(2)
|
||||
if verbose:
|
||||
print(f"{service} deployed successfully!")
|
||||
return f"{service} deployed to {region}"
|
||||
|
||||
|
||||
flx = Falyx("Deployment CLI")
|
||||
|
||||
flx.add_command(
|
||||
key="D",
|
||||
aliases=["deploy"],
|
||||
description="Deploy a service to a specified region.",
|
||||
action=Action(
|
||||
name="deploy_service",
|
||||
action=deploy,
|
||||
),
|
||||
arg_metadata={
|
||||
"service": "Service name",
|
||||
"region": {"help": "Deployment region", "choices": ["us-east-1", "us-west-2"]},
|
||||
"verbose": {"help": "Enable verbose mode"},
|
||||
},
|
||||
)
|
||||
|
||||
deploy_chain = ChainedAction(
|
||||
name="DeployChain",
|
||||
actions=[
|
||||
Action(name="deploy_service", action=deploy),
|
||||
Action(
|
||||
name="notify",
|
||||
action=lambda last_result: print(f"Notification: {last_result}"),
|
||||
),
|
||||
],
|
||||
auto_inject=True,
|
||||
)
|
||||
|
||||
flx.add_command(
|
||||
key="N",
|
||||
aliases=["notify"],
|
||||
description="Deploy a service and notify.",
|
||||
action=deploy_chain,
|
||||
)
|
||||
|
||||
asyncio.run(flx.run())
|
@ -4,7 +4,6 @@ from rich.console import Console
|
||||
|
||||
from falyx import ActionGroup, Falyx
|
||||
from falyx.action import HTTPAction
|
||||
from falyx.hook_manager import HookType
|
||||
from falyx.hooks import ResultReporter
|
||||
|
||||
console = Console()
|
||||
@ -49,7 +48,7 @@ action_group = ActionGroup(
|
||||
reporter = ResultReporter()
|
||||
|
||||
action_group.hooks.register(
|
||||
HookType.ON_SUCCESS,
|
||||
"on_success",
|
||||
reporter.report,
|
||||
)
|
||||
|
||||
|
@ -3,7 +3,6 @@ import asyncio
|
||||
from falyx import Action, ActionGroup, ChainedAction
|
||||
from falyx import ExecutionRegistry as er
|
||||
from falyx import ProcessAction
|
||||
from falyx.hook_manager import HookType
|
||||
from falyx.retry import RetryHandler, RetryPolicy
|
||||
|
||||
|
||||
@ -47,7 +46,7 @@ def build_pipeline():
|
||||
checkout = Action("Checkout", checkout_code)
|
||||
analysis = ProcessAction("Static Analysis", run_static_analysis)
|
||||
tests = Action("Run Tests", flaky_tests)
|
||||
tests.hooks.register(HookType.ON_ERROR, retry_handler.retry_on_error)
|
||||
tests.hooks.register("on_error", retry_handler.retry_on_error)
|
||||
|
||||
# Parallel deploys
|
||||
deploy_group = ActionGroup(
|
||||
|
@ -1,22 +1,26 @@
|
||||
import asyncio
|
||||
|
||||
from falyx.selection import (
|
||||
SelectionOption,
|
||||
prompt_for_selection,
|
||||
render_selection_dict_table,
|
||||
)
|
||||
from falyx.action import SelectionAction
|
||||
from falyx.selection import SelectionOption
|
||||
|
||||
menu = {
|
||||
"A": SelectionOption("Run diagnostics", lambda: print("Running diagnostics...")),
|
||||
"B": SelectionOption("Deploy to staging", lambda: print("Deploying...")),
|
||||
selections = {
|
||||
"1": SelectionOption(
|
||||
description="Production", value="3bc2616e-3696-11f0-a139-089204eb86ac"
|
||||
),
|
||||
"2": SelectionOption(
|
||||
description="Staging", value="42f2cd84-3696-11f0-a139-089204eb86ac"
|
||||
),
|
||||
}
|
||||
|
||||
table = render_selection_dict_table(
|
||||
title="Main Menu",
|
||||
selections=menu,
|
||||
|
||||
select = SelectionAction(
|
||||
name="Select Deployment",
|
||||
selections=selections,
|
||||
title="Select a Deployment",
|
||||
columns=2,
|
||||
prompt_message="> ",
|
||||
return_type="value",
|
||||
show_table=True,
|
||||
)
|
||||
|
||||
key = asyncio.run(prompt_for_selection(menu.keys(), table))
|
||||
print(f"You selected: {key}")
|
||||
|
||||
menu[key.upper()].value()
|
||||
print(asyncio.run(select()))
|
||||
|
@ -3,7 +3,6 @@ import asyncio
|
||||
|
||||
from falyx import Action, ChainedAction, Falyx
|
||||
from falyx.action import ShellAction
|
||||
from falyx.hook_manager import HookType
|
||||
from falyx.hooks import ResultReporter
|
||||
from falyx.utils import setup_logging
|
||||
|
||||
@ -42,12 +41,12 @@ reporter = ResultReporter()
|
||||
|
||||
a1 = Action("a1", a1, inject_last_result=True)
|
||||
a1.hooks.register(
|
||||
HookType.ON_SUCCESS,
|
||||
"on_success",
|
||||
reporter.report,
|
||||
)
|
||||
a2 = Action("a2", a2, inject_last_result=True)
|
||||
a2.hooks.register(
|
||||
HookType.ON_SUCCESS,
|
||||
"on_success",
|
||||
reporter.report,
|
||||
)
|
||||
|
||||
|
@ -12,7 +12,6 @@ from .command import Command
|
||||
from .context import ExecutionContext, SharedContext
|
||||
from .execution_registry import ExecutionRegistry
|
||||
from .falyx import Falyx
|
||||
from .hook_manager import HookType
|
||||
|
||||
logger = logging.getLogger("falyx")
|
||||
|
||||
|
0
falyx/action/.pytyped
Normal file
0
falyx/action/.pytyped
Normal file
@ -47,6 +47,7 @@ from falyx.execution_registry import ExecutionRegistry as er
|
||||
from falyx.hook_manager import Hook, HookManager, HookType
|
||||
from falyx.logger import logger
|
||||
from falyx.options_manager import OptionsManager
|
||||
from falyx.parsers.utils import same_argument_definitions
|
||||
from falyx.retry import RetryHandler, RetryPolicy
|
||||
from falyx.themes import OneColors
|
||||
from falyx.utils import ensure_async
|
||||
@ -61,8 +62,7 @@ class BaseAction(ABC):
|
||||
inject_last_result (bool): Whether to inject the previous action's result
|
||||
into kwargs.
|
||||
inject_into (str): The name of the kwarg key to inject the result as
|
||||
(default: 'last_result').
|
||||
_requires_injection (bool): Whether the action requires input injection.
|
||||
(default: 'last_result').
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@ -82,7 +82,6 @@ class BaseAction(ABC):
|
||||
self.inject_last_result: bool = inject_last_result
|
||||
self.inject_into: str = inject_into
|
||||
self._never_prompt: bool = never_prompt
|
||||
self._requires_injection: bool = False
|
||||
self._skip_in_chain: bool = False
|
||||
self.console = Console(color_system="auto")
|
||||
self.options_manager: OptionsManager | None = None
|
||||
@ -101,6 +100,14 @@ class BaseAction(ABC):
|
||||
async def preview(self, parent: Tree | None = None):
|
||||
raise NotImplementedError("preview must be implemented by subclasses")
|
||||
|
||||
@abstractmethod
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
|
||||
"""
|
||||
Returns the callable to be used for argument inference.
|
||||
By default, it returns None.
|
||||
"""
|
||||
raise NotImplementedError("get_infer_target must be implemented by subclasses")
|
||||
|
||||
def set_options_manager(self, options_manager: OptionsManager) -> None:
|
||||
self.options_manager = options_manager
|
||||
|
||||
@ -154,10 +161,6 @@ class BaseAction(ABC):
|
||||
async def _write_stdout(self, data: str) -> None:
|
||||
"""Override in subclasses that produce terminal output."""
|
||||
|
||||
def requires_io_injection(self) -> bool:
|
||||
"""Checks to see if the action requires input injection."""
|
||||
return self._requires_injection
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return str(self)
|
||||
|
||||
@ -246,6 +249,13 @@ class Action(BaseAction):
|
||||
if policy.enabled:
|
||||
self.enable_retry()
|
||||
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any], None]:
|
||||
"""
|
||||
Returns the callable to be used for argument inference.
|
||||
By default, it returns the action itself.
|
||||
"""
|
||||
return self.action, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> Any:
|
||||
combined_args = args + self.args
|
||||
combined_kwargs = self._maybe_inject_last_result({**self.kwargs, **kwargs})
|
||||
@ -477,6 +487,14 @@ class ChainedAction(BaseAction, ActionListMixin):
|
||||
if hasattr(action, "register_teardown") and callable(action.register_teardown):
|
||||
action.register_teardown(self.hooks)
|
||||
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
|
||||
if self.actions:
|
||||
return self.actions[0].get_infer_target()
|
||||
return None, None
|
||||
|
||||
def _clear_args(self):
|
||||
return (), {}
|
||||
|
||||
async def _run(self, *args, **kwargs) -> list[Any]:
|
||||
if not self.actions:
|
||||
raise EmptyChainError(f"[{self.name}] No actions to execute.")
|
||||
@ -505,12 +523,8 @@ class ChainedAction(BaseAction, ActionListMixin):
|
||||
continue
|
||||
shared_context.current_index = index
|
||||
prepared = action.prepare(shared_context, self.options_manager)
|
||||
last_result = shared_context.last_result()
|
||||
try:
|
||||
if self.requires_io_injection() and last_result is not None:
|
||||
result = await prepared(**{prepared.inject_into: last_result})
|
||||
else:
|
||||
result = await prepared(*args, **updated_kwargs)
|
||||
result = await prepared(*args, **updated_kwargs)
|
||||
except Exception as error:
|
||||
if index + 1 < len(self.actions) and isinstance(
|
||||
self.actions[index + 1], FallbackAction
|
||||
@ -529,6 +543,7 @@ class ChainedAction(BaseAction, ActionListMixin):
|
||||
fallback._skip_in_chain = True
|
||||
else:
|
||||
raise
|
||||
args, updated_kwargs = self._clear_args()
|
||||
shared_context.add_result(result)
|
||||
context.extra["results"].append(result)
|
||||
context.extra["rollback_stack"].append(prepared)
|
||||
@ -669,6 +684,16 @@ class ActionGroup(BaseAction, ActionListMixin):
|
||||
if hasattr(action, "register_teardown") and callable(action.register_teardown):
|
||||
action.register_teardown(self.hooks)
|
||||
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
|
||||
arg_defs = same_argument_definitions(self.actions)
|
||||
if arg_defs:
|
||||
return self.actions[0].get_infer_target()
|
||||
logger.debug(
|
||||
"[%s] auto_args disabled: mismatched ActionGroup arguments",
|
||||
self.name,
|
||||
)
|
||||
return None, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> list[tuple[str, Any]]:
|
||||
shared_context = SharedContext(name=self.name, action=self, is_parallel=True)
|
||||
if self.shared_context:
|
||||
@ -787,8 +812,11 @@ class ProcessAction(BaseAction):
|
||||
self.executor = executor or ProcessPoolExecutor()
|
||||
self.is_retryable = True
|
||||
|
||||
async def _run(self, *args, **kwargs):
|
||||
if self.inject_last_result:
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any] | None, None]:
|
||||
return self.action, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> Any:
|
||||
if self.inject_last_result and self.shared_context:
|
||||
last_result = self.shared_context.last_result()
|
||||
if not self._validate_pickleable(last_result):
|
||||
raise ValueError(
|
||||
|
@ -1,6 +1,6 @@
|
||||
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
|
||||
"""action_factory.py"""
|
||||
from typing import Any
|
||||
from typing import Any, Callable
|
||||
|
||||
from rich.tree import Tree
|
||||
|
||||
@ -35,6 +35,8 @@ class ActionFactoryAction(BaseAction):
|
||||
*,
|
||||
inject_last_result: bool = False,
|
||||
inject_into: str = "last_result",
|
||||
args: tuple[Any, ...] = (),
|
||||
kwargs: dict[str, Any] | None = None,
|
||||
preview_args: tuple[Any, ...] = (),
|
||||
preview_kwargs: dict[str, Any] | None = None,
|
||||
):
|
||||
@ -44,6 +46,8 @@ class ActionFactoryAction(BaseAction):
|
||||
inject_into=inject_into,
|
||||
)
|
||||
self.factory = factory
|
||||
self.args = args
|
||||
self.kwargs = kwargs or {}
|
||||
self.preview_args = preview_args
|
||||
self.preview_kwargs = preview_kwargs or {}
|
||||
|
||||
@ -55,7 +59,12 @@ class ActionFactoryAction(BaseAction):
|
||||
def factory(self, value: ActionFactoryProtocol):
|
||||
self._factory = ensure_async(value)
|
||||
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any], None]:
|
||||
return self.factory, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> Any:
|
||||
args = (*self.args, *args)
|
||||
kwargs = {**self.kwargs, **kwargs}
|
||||
updated_kwargs = self._maybe_inject_last_result(kwargs)
|
||||
context = ExecutionContext(
|
||||
name=f"{self.name} (factory)",
|
||||
@ -85,7 +94,7 @@ class ActionFactoryAction(BaseAction):
|
||||
)
|
||||
if self.options_manager:
|
||||
generated_action.set_options_manager(self.options_manager)
|
||||
context.result = await generated_action(*args, **kwargs)
|
||||
context.result = await generated_action()
|
||||
await self.hooks.trigger(HookType.ON_SUCCESS, context)
|
||||
return context.result
|
||||
except Exception as error:
|
||||
|
@ -19,7 +19,7 @@ import asyncio
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
from typing import Any
|
||||
from typing import Any, Callable
|
||||
|
||||
from rich.tree import Tree
|
||||
|
||||
@ -73,7 +73,6 @@ class BaseIOAction(BaseAction):
|
||||
inject_last_result=inject_last_result,
|
||||
)
|
||||
self.mode = mode
|
||||
self._requires_injection = True
|
||||
|
||||
def from_input(self, raw: str | bytes) -> Any:
|
||||
raise NotImplementedError
|
||||
@ -81,15 +80,15 @@ class BaseIOAction(BaseAction):
|
||||
def to_output(self, result: Any) -> str | bytes:
|
||||
raise NotImplementedError
|
||||
|
||||
async def _resolve_input(self, kwargs: dict[str, Any]) -> str | bytes:
|
||||
last_result = kwargs.pop(self.inject_into, None)
|
||||
|
||||
async def _resolve_input(
|
||||
self, args: tuple[Any], kwargs: dict[str, Any]
|
||||
) -> str | bytes:
|
||||
data = await self._read_stdin()
|
||||
if data:
|
||||
return self.from_input(data)
|
||||
|
||||
if last_result is not None:
|
||||
return last_result
|
||||
if len(args) == 1:
|
||||
return self.from_input(args[0])
|
||||
|
||||
if self.inject_last_result and self.shared_context:
|
||||
return self.shared_context.last_result()
|
||||
@ -99,6 +98,9 @@ class BaseIOAction(BaseAction):
|
||||
)
|
||||
raise FalyxError("No input provided and no last result to inject.")
|
||||
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
|
||||
return None, None
|
||||
|
||||
async def __call__(self, *args, **kwargs):
|
||||
context = ExecutionContext(
|
||||
name=self.name,
|
||||
@ -117,8 +119,8 @@ class BaseIOAction(BaseAction):
|
||||
pass
|
||||
result = getattr(self, "_last_result", None)
|
||||
else:
|
||||
parsed_input = await self._resolve_input(kwargs)
|
||||
result = await self._run(parsed_input, *args, **kwargs)
|
||||
parsed_input = await self._resolve_input(args, kwargs)
|
||||
result = await self._run(parsed_input)
|
||||
output = self.to_output(result)
|
||||
await self._write_stdout(output)
|
||||
context.result = result
|
||||
@ -195,7 +197,6 @@ class ShellAction(BaseIOAction):
|
||||
- Captures stdout and stderr from shell execution
|
||||
- Raises on non-zero exit codes with stderr as the error
|
||||
- Result is returned as trimmed stdout string
|
||||
- Compatible with ChainedAction and Command.requires_input detection
|
||||
|
||||
Args:
|
||||
name (str): Name of the action.
|
||||
@ -220,11 +221,19 @@ class ShellAction(BaseIOAction):
|
||||
)
|
||||
return raw.strip() if isinstance(raw, str) else raw.decode("utf-8").strip()
|
||||
|
||||
def get_infer_target(self) -> tuple[Callable[..., Any] | None, dict[str, Any] | None]:
|
||||
if sys.stdin.isatty():
|
||||
return self._run, {"parsed_input": {"help": self.command_template}}
|
||||
return None, None
|
||||
|
||||
async def _run(self, parsed_input: str) -> str:
|
||||
# Replace placeholder in template, or use raw input as full command
|
||||
command = self.command_template.format(parsed_input)
|
||||
if self.safe_mode:
|
||||
args = shlex.split(command)
|
||||
try:
|
||||
args = shlex.split(command)
|
||||
except ValueError as error:
|
||||
raise FalyxError(f"Invalid command template: {error}")
|
||||
result = subprocess.run(args, capture_output=True, text=True, check=True)
|
||||
else:
|
||||
result = subprocess.run(
|
||||
|
@ -73,6 +73,9 @@ class MenuAction(BaseAction):
|
||||
table.add_row(*row)
|
||||
return table
|
||||
|
||||
def get_infer_target(self) -> tuple[None, None]:
|
||||
return None, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> Any:
|
||||
kwargs = self._maybe_inject_last_result(kwargs)
|
||||
context = ExecutionContext(
|
||||
|
@ -25,6 +25,7 @@ from falyx.selection import (
|
||||
prompt_for_selection,
|
||||
render_selection_dict_table,
|
||||
)
|
||||
from falyx.signals import CancelSignal
|
||||
from falyx.themes import OneColors
|
||||
|
||||
|
||||
@ -121,6 +122,16 @@ class SelectFileAction(BaseAction):
|
||||
logger.warning("[ERROR] Failed to parse %s: %s", file.name, error)
|
||||
return options
|
||||
|
||||
def _find_cancel_key(self, options) -> str:
|
||||
"""Return first numeric value not already used in the selection dict."""
|
||||
for index in range(len(options)):
|
||||
if str(index) not in options:
|
||||
return str(index)
|
||||
return str(len(options))
|
||||
|
||||
def get_infer_target(self) -> tuple[None, None]:
|
||||
return None, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> Any:
|
||||
context = ExecutionContext(name=self.name, args=args, kwargs=kwargs, action=self)
|
||||
context.start_timer()
|
||||
@ -128,28 +139,38 @@ class SelectFileAction(BaseAction):
|
||||
await self.hooks.trigger(HookType.BEFORE, context)
|
||||
|
||||
files = [
|
||||
f
|
||||
for f in self.directory.iterdir()
|
||||
if f.is_file()
|
||||
and (self.suffix_filter is None or f.suffix == self.suffix_filter)
|
||||
file
|
||||
for file in self.directory.iterdir()
|
||||
if file.is_file()
|
||||
and (self.suffix_filter is None or file.suffix == self.suffix_filter)
|
||||
]
|
||||
if not files:
|
||||
raise FileNotFoundError("No files found in directory.")
|
||||
|
||||
options = self.get_options(files)
|
||||
|
||||
cancel_key = self._find_cancel_key(options)
|
||||
cancel_option = {
|
||||
cancel_key: SelectionOption(
|
||||
description="Cancel", value=CancelSignal(), style=OneColors.DARK_RED
|
||||
)
|
||||
}
|
||||
|
||||
table = render_selection_dict_table(
|
||||
title=self.title, selections=options, columns=self.columns
|
||||
title=self.title, selections=options | cancel_option, columns=self.columns
|
||||
)
|
||||
|
||||
key = await prompt_for_selection(
|
||||
options.keys(),
|
||||
(options | cancel_option).keys(),
|
||||
table,
|
||||
console=self.console,
|
||||
prompt_session=self.prompt_session,
|
||||
prompt_message=self.prompt_message,
|
||||
)
|
||||
|
||||
if key == cancel_key:
|
||||
raise CancelSignal("User canceled the selection.")
|
||||
|
||||
result = options[key].value
|
||||
context.result = result
|
||||
await self.hooks.trigger(HookType.ON_SUCCESS, context)
|
||||
@ -176,11 +197,11 @@ class SelectFileAction(BaseAction):
|
||||
try:
|
||||
files = list(self.directory.iterdir())
|
||||
if self.suffix_filter:
|
||||
files = [f for f in files if f.suffix == self.suffix_filter]
|
||||
files = [file for file in files if file.suffix == self.suffix_filter]
|
||||
sample = files[:10]
|
||||
file_list = tree.add("[dim]Files:[/]")
|
||||
for f in sample:
|
||||
file_list.add(f"[dim]{f.name}[/]")
|
||||
for file in sample:
|
||||
file_list.add(f"[dim]{file.name}[/]")
|
||||
if len(files) > 10:
|
||||
file_list.add(f"[dim]... ({len(files) - 10} more)[/]")
|
||||
except Exception as error:
|
||||
|
@ -7,19 +7,21 @@ from rich.console import Console
|
||||
from rich.tree import Tree
|
||||
|
||||
from falyx.action.action import BaseAction
|
||||
from falyx.action.types import SelectionReturnType
|
||||
from falyx.context import ExecutionContext
|
||||
from falyx.execution_registry import ExecutionRegistry as er
|
||||
from falyx.hook_manager import HookType
|
||||
from falyx.logger import logger
|
||||
from falyx.selection import (
|
||||
SelectionOption,
|
||||
SelectionOptionMap,
|
||||
prompt_for_index,
|
||||
prompt_for_selection,
|
||||
render_selection_dict_table,
|
||||
render_selection_indexed_table,
|
||||
)
|
||||
from falyx.signals import CancelSignal
|
||||
from falyx.themes import OneColors
|
||||
from falyx.utils import CaseInsensitiveDict
|
||||
|
||||
|
||||
class SelectionAction(BaseAction):
|
||||
@ -34,7 +36,13 @@ class SelectionAction(BaseAction):
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
selections: list[str] | set[str] | tuple[str, ...] | dict[str, SelectionOption],
|
||||
selections: (
|
||||
list[str]
|
||||
| set[str]
|
||||
| tuple[str, ...]
|
||||
| dict[str, SelectionOption]
|
||||
| dict[str, Any]
|
||||
),
|
||||
*,
|
||||
title: str = "Select an option",
|
||||
columns: int = 5,
|
||||
@ -42,7 +50,7 @@ class SelectionAction(BaseAction):
|
||||
default_selection: str = "",
|
||||
inject_last_result: bool = False,
|
||||
inject_into: str = "last_result",
|
||||
return_key: bool = False,
|
||||
return_type: SelectionReturnType | str = "value",
|
||||
console: Console | None = None,
|
||||
prompt_session: PromptSession | None = None,
|
||||
never_prompt: bool = False,
|
||||
@ -55,8 +63,8 @@ class SelectionAction(BaseAction):
|
||||
never_prompt=never_prompt,
|
||||
)
|
||||
# Setter normalizes to correct type, mypy can't infer that
|
||||
self.selections: list[str] | CaseInsensitiveDict = selections # type: ignore[assignment]
|
||||
self.return_key = return_key
|
||||
self.selections: list[str] | SelectionOptionMap = selections # type: ignore[assignment]
|
||||
self.return_type: SelectionReturnType = self._coerce_return_type(return_type)
|
||||
self.title = title
|
||||
self.columns = columns
|
||||
self.console = console or Console(color_system="auto")
|
||||
@ -65,8 +73,15 @@ class SelectionAction(BaseAction):
|
||||
self.prompt_message = prompt_message
|
||||
self.show_table = show_table
|
||||
|
||||
def _coerce_return_type(
|
||||
self, return_type: SelectionReturnType | str
|
||||
) -> SelectionReturnType:
|
||||
if isinstance(return_type, SelectionReturnType):
|
||||
return return_type
|
||||
return SelectionReturnType(return_type)
|
||||
|
||||
@property
|
||||
def selections(self) -> list[str] | CaseInsensitiveDict:
|
||||
def selections(self) -> list[str] | SelectionOptionMap:
|
||||
return self._selections
|
||||
|
||||
@selections.setter
|
||||
@ -74,17 +89,41 @@ class SelectionAction(BaseAction):
|
||||
self, value: list[str] | set[str] | tuple[str, ...] | dict[str, SelectionOption]
|
||||
):
|
||||
if isinstance(value, (list, tuple, set)):
|
||||
self._selections: list[str] | CaseInsensitiveDict = list(value)
|
||||
self._selections: list[str] | SelectionOptionMap = list(value)
|
||||
elif isinstance(value, dict):
|
||||
cid = CaseInsensitiveDict()
|
||||
cid.update(value)
|
||||
self._selections = cid
|
||||
som = SelectionOptionMap()
|
||||
if all(isinstance(key, str) for key in value) and all(
|
||||
not isinstance(value[key], SelectionOption) for key in value
|
||||
):
|
||||
som.update(
|
||||
{
|
||||
str(index): SelectionOption(key, option)
|
||||
for index, (key, option) in enumerate(value.items())
|
||||
}
|
||||
)
|
||||
elif all(isinstance(key, str) for key in value) and all(
|
||||
isinstance(value[key], SelectionOption) for key in value
|
||||
):
|
||||
som.update(value)
|
||||
else:
|
||||
raise ValueError("Invalid dictionary format. Keys must be strings")
|
||||
self._selections = som
|
||||
else:
|
||||
raise TypeError(
|
||||
"'selections' must be a list[str] or dict[str, SelectionOption], "
|
||||
f"got {type(value).__name__}"
|
||||
)
|
||||
|
||||
def _find_cancel_key(self) -> str:
|
||||
"""Return first numeric value not already used in the selection dict."""
|
||||
for index in range(len(self.selections)):
|
||||
if str(index) not in self.selections:
|
||||
return str(index)
|
||||
return str(len(self.selections))
|
||||
|
||||
def get_infer_target(self) -> tuple[None, None]:
|
||||
return None, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> Any:
|
||||
kwargs = self._maybe_inject_last_result(kwargs)
|
||||
context = ExecutionContext(
|
||||
@ -125,16 +164,17 @@ class SelectionAction(BaseAction):
|
||||
|
||||
context.start_timer()
|
||||
try:
|
||||
cancel_key = self._find_cancel_key()
|
||||
await self.hooks.trigger(HookType.BEFORE, context)
|
||||
if isinstance(self.selections, list):
|
||||
table = render_selection_indexed_table(
|
||||
title=self.title,
|
||||
selections=self.selections,
|
||||
selections=self.selections + ["Cancel"],
|
||||
columns=self.columns,
|
||||
)
|
||||
if not self.never_prompt:
|
||||
index = await prompt_for_index(
|
||||
len(self.selections) - 1,
|
||||
len(self.selections),
|
||||
table,
|
||||
default_selection=effective_default,
|
||||
console=self.console,
|
||||
@ -144,14 +184,23 @@ class SelectionAction(BaseAction):
|
||||
)
|
||||
else:
|
||||
index = effective_default
|
||||
result = self.selections[int(index)]
|
||||
if index == cancel_key:
|
||||
raise CancelSignal("User cancelled the selection.")
|
||||
result: Any = self.selections[int(index)]
|
||||
elif isinstance(self.selections, dict):
|
||||
cancel_option = {
|
||||
cancel_key: SelectionOption(
|
||||
description="Cancel", value=CancelSignal, style=OneColors.DARK_RED
|
||||
)
|
||||
}
|
||||
table = render_selection_dict_table(
|
||||
title=self.title, selections=self.selections, columns=self.columns
|
||||
title=self.title,
|
||||
selections=self.selections | cancel_option,
|
||||
columns=self.columns,
|
||||
)
|
||||
if not self.never_prompt:
|
||||
key = await prompt_for_selection(
|
||||
self.selections.keys(),
|
||||
(self.selections | cancel_option).keys(),
|
||||
table,
|
||||
default_selection=effective_default,
|
||||
console=self.console,
|
||||
@ -161,10 +210,25 @@ class SelectionAction(BaseAction):
|
||||
)
|
||||
else:
|
||||
key = effective_default
|
||||
result = key if self.return_key else self.selections[key].value
|
||||
if key == cancel_key:
|
||||
raise CancelSignal("User cancelled the selection.")
|
||||
if self.return_type == SelectionReturnType.KEY:
|
||||
result = key
|
||||
elif self.return_type == SelectionReturnType.VALUE:
|
||||
result = self.selections[key].value
|
||||
elif self.return_type == SelectionReturnType.ITEMS:
|
||||
result = {key: self.selections[key]}
|
||||
elif self.return_type == SelectionReturnType.DESCRIPTION:
|
||||
result = self.selections[key].description
|
||||
elif self.return_type == SelectionReturnType.DESCRIPTION_VALUE:
|
||||
result = {
|
||||
self.selections[key].description: self.selections[key].value
|
||||
}
|
||||
else:
|
||||
raise ValueError(f"Unsupported return type: {self.return_type}")
|
||||
else:
|
||||
raise TypeError(
|
||||
"'selections' must be a list[str] or dict[str, tuple[str, Any]], "
|
||||
"'selections' must be a list[str] or dict[str, Any], "
|
||||
f"got {type(self.selections).__name__}"
|
||||
)
|
||||
context.result = result
|
||||
@ -203,7 +267,7 @@ class SelectionAction(BaseAction):
|
||||
return
|
||||
|
||||
tree.add(f"[dim]Default:[/] '{self.default_selection or self.last_result}'")
|
||||
tree.add(f"[dim]Return:[/] {'Key' if self.return_key else 'Value'}")
|
||||
tree.add(f"[dim]Return:[/] {self.return_type.name.capitalize()}")
|
||||
tree.add(f"[dim]Prompt:[/] {'Disabled' if self.never_prompt else 'Enabled'}")
|
||||
|
||||
if not parent:
|
||||
@ -218,6 +282,6 @@ class SelectionAction(BaseAction):
|
||||
return (
|
||||
f"SelectionAction(name={self.name!r}, type={selection_type}, "
|
||||
f"default_selection={self.default_selection!r}, "
|
||||
f"return_key={self.return_key}, "
|
||||
f"return_type={self.return_type!r}, "
|
||||
f"prompt={'off' if self.never_prompt else 'on'})"
|
||||
)
|
||||
|
@ -35,3 +35,18 @@ class FileReturnType(Enum):
|
||||
return member
|
||||
valid = ", ".join(member.value for member in cls)
|
||||
raise ValueError(f"Invalid FileReturnType: '{value}'. Must be one of: {valid}")
|
||||
|
||||
|
||||
class SelectionReturnType(Enum):
|
||||
"""Enum for dictionary return types."""
|
||||
|
||||
KEY = "key"
|
||||
VALUE = "value"
|
||||
DESCRIPTION = "description"
|
||||
DESCRIPTION_VALUE = "description_value"
|
||||
ITEMS = "items"
|
||||
|
||||
@classmethod
|
||||
def _missing_(cls, value: object) -> SelectionReturnType:
|
||||
valid = ", ".join(member.value for member in cls)
|
||||
raise ValueError(f"Invalid DictReturnType: '{value}'. Must be one of: {valid}")
|
||||
|
@ -43,6 +43,9 @@ class UserInputAction(BaseAction):
|
||||
self.console = console or Console(color_system="auto")
|
||||
self.prompt_session = prompt_session or PromptSession()
|
||||
|
||||
def get_infer_target(self) -> tuple[None, None]:
|
||||
return None, None
|
||||
|
||||
async def _run(self, *args, **kwargs) -> str:
|
||||
context = ExecutionContext(
|
||||
name=self.name,
|
||||
|
@ -19,7 +19,6 @@ in building robust interactive menus.
|
||||
from __future__ import annotations
|
||||
|
||||
import shlex
|
||||
from functools import cached_property
|
||||
from typing import Any, Callable
|
||||
|
||||
from prompt_toolkit.formatted_text import FormattedText
|
||||
@ -27,15 +26,15 @@ from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator
|
||||
from rich.console import Console
|
||||
from rich.tree import Tree
|
||||
|
||||
from falyx.action.action import Action, ActionGroup, BaseAction, ChainedAction
|
||||
from falyx.action.io_action import BaseIOAction
|
||||
from falyx.argparse import CommandArgumentParser
|
||||
from falyx.action.action import Action, BaseAction
|
||||
from falyx.context import ExecutionContext
|
||||
from falyx.debug import register_debug_hooks
|
||||
from falyx.execution_registry import ExecutionRegistry as er
|
||||
from falyx.hook_manager import HookManager, HookType
|
||||
from falyx.logger import logger
|
||||
from falyx.options_manager import OptionsManager
|
||||
from falyx.parsers.argparse import CommandArgumentParser
|
||||
from falyx.parsers.signature import infer_args_from_func
|
||||
from falyx.prompt_utils import confirm_async, should_prompt_user
|
||||
from falyx.protocols import ArgParserProtocol
|
||||
from falyx.retry import RetryPolicy
|
||||
@ -89,7 +88,11 @@ class Command(BaseModel):
|
||||
retry_policy (RetryPolicy): Retry behavior configuration.
|
||||
tags (list[str]): Organizational tags for the command.
|
||||
logging_hooks (bool): Whether to attach logging hooks automatically.
|
||||
requires_input (bool | None): Indicates if the action needs input.
|
||||
options_manager (OptionsManager): Manages global command-line options.
|
||||
arg_parser (CommandArgumentParser): Parses command arguments.
|
||||
custom_parser (ArgParserProtocol | None): Custom argument parser.
|
||||
custom_help (Callable[[], str | None] | None): Custom help message generator.
|
||||
auto_args (bool): Automatically infer arguments from the action.
|
||||
|
||||
Methods:
|
||||
__call__(): Executes the command, respecting hooks and retries.
|
||||
@ -101,12 +104,13 @@ class Command(BaseModel):
|
||||
|
||||
key: str
|
||||
description: str
|
||||
action: BaseAction | Callable[[], Any]
|
||||
action: BaseAction | Callable[..., Any]
|
||||
args: tuple = ()
|
||||
kwargs: dict[str, Any] = Field(default_factory=dict)
|
||||
hidden: bool = False
|
||||
aliases: list[str] = Field(default_factory=list)
|
||||
help_text: str = ""
|
||||
help_epilogue: str = ""
|
||||
style: str = OneColors.WHITE
|
||||
confirm: bool = False
|
||||
confirm_message: str = "Are you sure?"
|
||||
@ -122,25 +126,46 @@ class Command(BaseModel):
|
||||
retry_policy: RetryPolicy = Field(default_factory=RetryPolicy)
|
||||
tags: list[str] = Field(default_factory=list)
|
||||
logging_hooks: bool = False
|
||||
requires_input: bool | None = None
|
||||
options_manager: OptionsManager = Field(default_factory=OptionsManager)
|
||||
arg_parser: CommandArgumentParser = Field(default_factory=CommandArgumentParser)
|
||||
arguments: list[dict[str, Any]] = Field(default_factory=list)
|
||||
argument_config: Callable[[CommandArgumentParser], None] | None = None
|
||||
custom_parser: ArgParserProtocol | None = None
|
||||
custom_help: Callable[[], str | None] | None = None
|
||||
auto_args: bool = True
|
||||
arg_metadata: dict[str, str | dict[str, Any]] = Field(default_factory=dict)
|
||||
|
||||
_context: ExecutionContext | None = PrivateAttr(default=None)
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
|
||||
def parse_args(self, raw_args: list[str] | str) -> tuple[tuple, dict]:
|
||||
if self.custom_parser:
|
||||
def parse_args(
|
||||
self, raw_args: list[str] | str, from_validate: bool = False
|
||||
) -> tuple[tuple, dict]:
|
||||
if callable(self.custom_parser):
|
||||
if isinstance(raw_args, str):
|
||||
raw_args = shlex.split(raw_args)
|
||||
try:
|
||||
raw_args = shlex.split(raw_args)
|
||||
except ValueError:
|
||||
logger.warning(
|
||||
"[Command:%s] Failed to split arguments: %s",
|
||||
self.key,
|
||||
raw_args,
|
||||
)
|
||||
return ((), {})
|
||||
return self.custom_parser(raw_args)
|
||||
|
||||
if isinstance(raw_args, str):
|
||||
raw_args = shlex.split(raw_args)
|
||||
return self.arg_parser.parse_args_split(raw_args)
|
||||
try:
|
||||
raw_args = shlex.split(raw_args)
|
||||
except ValueError:
|
||||
logger.warning(
|
||||
"[Command:%s] Failed to split arguments: %s",
|
||||
self.key,
|
||||
raw_args,
|
||||
)
|
||||
return ((), {})
|
||||
return self.arg_parser.parse_args_split(raw_args, from_validate=from_validate)
|
||||
|
||||
@field_validator("action", mode="before")
|
||||
@classmethod
|
||||
@ -151,11 +176,24 @@ class Command(BaseModel):
|
||||
return ensure_async(action)
|
||||
raise TypeError("Action must be a callable or an instance of BaseAction")
|
||||
|
||||
def get_argument_definitions(self) -> list[dict[str, Any]]:
|
||||
if self.arguments:
|
||||
return self.arguments
|
||||
elif callable(self.argument_config):
|
||||
self.argument_config(self.arg_parser)
|
||||
elif self.auto_args:
|
||||
if isinstance(self.action, BaseAction):
|
||||
infer_target, maybe_metadata = self.action.get_infer_target()
|
||||
# merge metadata with the action's metadata if not already in self.arg_metadata
|
||||
if maybe_metadata:
|
||||
self.arg_metadata = {**maybe_metadata, **self.arg_metadata}
|
||||
return infer_args_from_func(infer_target, self.arg_metadata)
|
||||
elif callable(self.action):
|
||||
return infer_args_from_func(self.action, self.arg_metadata)
|
||||
return []
|
||||
|
||||
def model_post_init(self, _: Any) -> None:
|
||||
"""Post-initialization to set up the action and hooks."""
|
||||
if isinstance(self.arg_parser, CommandArgumentParser):
|
||||
self.arg_parser.command_description = self.description
|
||||
|
||||
if self.retry and isinstance(self.action, Action):
|
||||
self.action.enable_retry()
|
||||
elif self.retry_policy and isinstance(self.action, Action):
|
||||
@ -177,26 +215,8 @@ class Command(BaseModel):
|
||||
if self.logging_hooks and isinstance(self.action, BaseAction):
|
||||
register_debug_hooks(self.action.hooks)
|
||||
|
||||
if self.requires_input is None and self.detect_requires_input:
|
||||
self.requires_input = True
|
||||
self.hidden = True
|
||||
elif self.requires_input is None:
|
||||
self.requires_input = False
|
||||
|
||||
@cached_property
|
||||
def detect_requires_input(self) -> bool:
|
||||
"""Detect if the action requires input based on its type."""
|
||||
if isinstance(self.action, BaseIOAction):
|
||||
return True
|
||||
elif isinstance(self.action, ChainedAction):
|
||||
return (
|
||||
isinstance(self.action.actions[0], BaseIOAction)
|
||||
if self.action.actions
|
||||
else False
|
||||
)
|
||||
elif isinstance(self.action, ActionGroup):
|
||||
return any(isinstance(action, BaseIOAction) for action in self.action.actions)
|
||||
return False
|
||||
for arg_def in self.get_argument_definitions():
|
||||
self.arg_parser.add_argument(*arg_def.pop("flags"), **arg_def)
|
||||
|
||||
def _inject_options_manager(self) -> None:
|
||||
"""Inject the options manager into the action if applicable."""
|
||||
@ -290,7 +310,7 @@ class Command(BaseModel):
|
||||
|
||||
def show_help(self) -> bool:
|
||||
"""Display the help message for the command."""
|
||||
if self.custom_help:
|
||||
if callable(self.custom_help):
|
||||
output = self.custom_help()
|
||||
if output:
|
||||
console.print(output)
|
||||
|
@ -98,7 +98,6 @@ class RawCommand(BaseModel):
|
||||
retry: bool = False
|
||||
retry_all: bool = False
|
||||
retry_policy: RetryPolicy = Field(default_factory=RetryPolicy)
|
||||
requires_input: bool | None = None
|
||||
hidden: bool = False
|
||||
help_text: str = ""
|
||||
|
||||
|
123
falyx/falyx.py
123
falyx/falyx.py
@ -58,11 +58,12 @@ from falyx.execution_registry import ExecutionRegistry as er
|
||||
from falyx.hook_manager import Hook, HookManager, HookType
|
||||
from falyx.logger import logger
|
||||
from falyx.options_manager import OptionsManager
|
||||
from falyx.parsers import get_arg_parsers
|
||||
from falyx.parsers import CommandArgumentParser, get_arg_parsers
|
||||
from falyx.protocols import ArgParserProtocol
|
||||
from falyx.retry import RetryPolicy
|
||||
from falyx.signals import BackSignal, CancelSignal, HelpSignal, QuitSignal
|
||||
from falyx.themes import OneColors, get_nord_theme
|
||||
from falyx.utils import CaseInsensitiveDict, _noop, chunks, get_program_invocation
|
||||
from falyx.utils import CaseInsensitiveDict, _noop, chunks
|
||||
from falyx.version import __version__
|
||||
|
||||
|
||||
@ -89,7 +90,7 @@ class CommandValidator(Validator):
|
||||
if not choice:
|
||||
raise ValidationError(
|
||||
message=self.error_message,
|
||||
cursor_position=document.get_end_of_document_position(),
|
||||
cursor_position=len(text),
|
||||
)
|
||||
|
||||
|
||||
@ -110,6 +111,8 @@ class Falyx:
|
||||
- Submenu nesting and action chaining
|
||||
- History tracking, help generation, and run key execution modes
|
||||
- Seamless CLI argument parsing and integration via argparse
|
||||
- Declarative option management with OptionsManager
|
||||
- Command level argument parsing and validation
|
||||
- Extensible with user-defined hooks, bottom bars, and custom layouts
|
||||
|
||||
Args:
|
||||
@ -125,7 +128,7 @@ class Falyx:
|
||||
never_prompt (bool): Seed default for `OptionsManager["never_prompt"]`
|
||||
force_confirm (bool): Seed default for `OptionsManager["force_confirm"]`
|
||||
cli_args (Namespace | None): Parsed CLI arguments, usually from argparse.
|
||||
options (OptionsManager | None): Declarative option mappings.
|
||||
options (OptionsManager | None): Declarative option mappings for global state.
|
||||
custom_table (Callable[[Falyx], Table] | Table | None): Custom menu table
|
||||
generator.
|
||||
|
||||
@ -157,8 +160,9 @@ class Falyx:
|
||||
force_confirm: bool = False,
|
||||
cli_args: Namespace | None = None,
|
||||
options: OptionsManager | None = None,
|
||||
render_menu: Callable[["Falyx"], None] | None = None,
|
||||
custom_table: Callable[["Falyx"], Table] | Table | None = None,
|
||||
render_menu: Callable[[Falyx], None] | None = None,
|
||||
custom_table: Callable[[Falyx], Table] | Table | None = None,
|
||||
hide_menu_table: bool = False,
|
||||
) -> None:
|
||||
"""Initializes the Falyx object."""
|
||||
self.title: str | Markdown = title
|
||||
@ -182,8 +186,9 @@ class Falyx:
|
||||
self._never_prompt: bool = never_prompt
|
||||
self._force_confirm: bool = force_confirm
|
||||
self.cli_args: Namespace | None = cli_args
|
||||
self.render_menu: Callable[["Falyx"], None] | None = render_menu
|
||||
self.custom_table: Callable[["Falyx"], Table] | Table | None = custom_table
|
||||
self.render_menu: Callable[[Falyx], None] | None = render_menu
|
||||
self.custom_table: Callable[[Falyx], Table] | Table | None = custom_table
|
||||
self.hide_menu_table: bool = hide_menu_table
|
||||
self.validate_options(cli_args, options)
|
||||
self._prompt_session: PromptSession | None = None
|
||||
self.mode = FalyxMode.MENU
|
||||
@ -286,8 +291,6 @@ class Falyx:
|
||||
|
||||
for command in self.commands.values():
|
||||
help_text = command.help_text or command.description
|
||||
if command.requires_input:
|
||||
help_text += " [dim](requires input)[/dim]"
|
||||
table.add_row(
|
||||
f"[{command.style}]{command.key}[/]",
|
||||
", ".join(command.aliases) if command.aliases else "",
|
||||
@ -524,7 +527,7 @@ class Falyx:
|
||||
key: str = "X",
|
||||
description: str = "Exit",
|
||||
aliases: list[str] | None = None,
|
||||
action: Callable[[], Any] | None = None,
|
||||
action: Callable[..., Any] | None = None,
|
||||
style: str = OneColors.DARK_RED,
|
||||
confirm: bool = False,
|
||||
confirm_message: str = "Are you sure?",
|
||||
@ -578,13 +581,14 @@ class Falyx:
|
||||
self,
|
||||
key: str,
|
||||
description: str,
|
||||
action: BaseAction | Callable[[], Any],
|
||||
action: BaseAction | Callable[..., Any],
|
||||
*,
|
||||
args: tuple = (),
|
||||
kwargs: dict[str, Any] | None = None,
|
||||
hidden: bool = False,
|
||||
aliases: list[str] | None = None,
|
||||
help_text: str = "",
|
||||
help_epilogue: str = "",
|
||||
style: str = OneColors.WHITE,
|
||||
confirm: bool = False,
|
||||
confirm_message: str = "Are you sure?",
|
||||
@ -605,10 +609,33 @@ class Falyx:
|
||||
retry: bool = False,
|
||||
retry_all: bool = False,
|
||||
retry_policy: RetryPolicy | None = None,
|
||||
requires_input: bool | None = None,
|
||||
arg_parser: CommandArgumentParser | None = None,
|
||||
arguments: list[dict[str, Any]] | None = None,
|
||||
argument_config: Callable[[CommandArgumentParser], None] | None = None,
|
||||
custom_parser: ArgParserProtocol | None = None,
|
||||
custom_help: Callable[[], str | None] | None = None,
|
||||
auto_args: bool = True,
|
||||
arg_metadata: dict[str, str | dict[str, Any]] | None = None,
|
||||
) -> Command:
|
||||
"""Adds an command to the menu, preventing duplicates."""
|
||||
self._validate_command_key(key)
|
||||
|
||||
if arg_parser:
|
||||
if not isinstance(arg_parser, CommandArgumentParser):
|
||||
raise NotAFalyxError(
|
||||
"arg_parser must be an instance of CommandArgumentParser."
|
||||
)
|
||||
arg_parser = arg_parser
|
||||
else:
|
||||
arg_parser = CommandArgumentParser(
|
||||
command_key=key,
|
||||
command_description=description,
|
||||
command_style=style,
|
||||
help_text=help_text,
|
||||
help_epilogue=help_epilogue,
|
||||
aliases=aliases,
|
||||
)
|
||||
|
||||
command = Command(
|
||||
key=key,
|
||||
description=description,
|
||||
@ -618,6 +645,7 @@ class Falyx:
|
||||
hidden=hidden,
|
||||
aliases=aliases if aliases else [],
|
||||
help_text=help_text,
|
||||
help_epilogue=help_epilogue,
|
||||
style=style,
|
||||
confirm=confirm,
|
||||
confirm_message=confirm_message,
|
||||
@ -632,8 +660,14 @@ class Falyx:
|
||||
retry=retry,
|
||||
retry_all=retry_all,
|
||||
retry_policy=retry_policy or RetryPolicy(),
|
||||
requires_input=requires_input,
|
||||
options_manager=self.options,
|
||||
arg_parser=arg_parser,
|
||||
arguments=arguments or [],
|
||||
argument_config=argument_config,
|
||||
custom_parser=custom_parser,
|
||||
custom_help=custom_help,
|
||||
auto_args=auto_args,
|
||||
arg_metadata=arg_metadata or {},
|
||||
)
|
||||
|
||||
if hooks:
|
||||
@ -715,7 +749,10 @@ class Falyx:
|
||||
"""
|
||||
args = ()
|
||||
kwargs: dict[str, Any] = {}
|
||||
choice, *input_args = shlex.split(raw_choices)
|
||||
try:
|
||||
choice, *input_args = shlex.split(raw_choices)
|
||||
except ValueError:
|
||||
return False, None, args, kwargs
|
||||
is_preview, choice = self.parse_preview_command(choice)
|
||||
if is_preview and not choice and self.help_command:
|
||||
is_preview = False
|
||||
@ -730,24 +767,27 @@ class Falyx:
|
||||
|
||||
choice = choice.upper()
|
||||
name_map = self._name_map
|
||||
if choice in name_map:
|
||||
if name_map.get(choice):
|
||||
if not from_validate:
|
||||
logger.info("Command '%s' selected.", choice)
|
||||
if input_args and name_map[choice].arg_parser:
|
||||
try:
|
||||
args, kwargs = name_map[choice].parse_args(input_args)
|
||||
except CommandArgumentError as error:
|
||||
if not from_validate:
|
||||
if not name_map[choice].show_help():
|
||||
self.console.print(
|
||||
f"[{OneColors.DARK_RED}]❌ Invalid arguments for '{choice}': {error}"
|
||||
)
|
||||
else:
|
||||
name_map[choice].show_help()
|
||||
raise ValidationError(
|
||||
message=str(error), cursor_position=len(raw_choices)
|
||||
if is_preview:
|
||||
return True, name_map[choice], args, kwargs
|
||||
try:
|
||||
args, kwargs = name_map[choice].parse_args(input_args, from_validate)
|
||||
except CommandArgumentError as error:
|
||||
if not from_validate:
|
||||
if not name_map[choice].show_help():
|
||||
self.console.print(
|
||||
f"[{OneColors.DARK_RED}]❌ Invalid arguments for '{choice}': {error}"
|
||||
)
|
||||
return is_preview, None, args, kwargs
|
||||
else:
|
||||
name_map[choice].show_help()
|
||||
raise ValidationError(
|
||||
message=str(error), cursor_position=len(raw_choices)
|
||||
)
|
||||
return is_preview, None, args, kwargs
|
||||
except HelpSignal:
|
||||
return True, None, args, kwargs
|
||||
return is_preview, name_map[choice], args, kwargs
|
||||
|
||||
prefix_matches = [cmd for key, cmd in name_map.items() if key.startswith(choice)]
|
||||
@ -804,15 +844,6 @@ class Falyx:
|
||||
await selected_command.preview()
|
||||
return True
|
||||
|
||||
if selected_command.requires_input:
|
||||
program = get_program_invocation()
|
||||
self.console.print(
|
||||
f"[{OneColors.LIGHT_YELLOW}]⚠️ Command '{selected_command.key}' requires"
|
||||
f" input and must be run via [{OneColors.MAGENTA}]'{program} run"
|
||||
f"'[{OneColors.LIGHT_YELLOW}] with proper piping or arguments.[/]"
|
||||
)
|
||||
return True
|
||||
|
||||
self.last_run_command = selected_command
|
||||
|
||||
if selected_command == self.exit_command:
|
||||
@ -823,7 +854,6 @@ class Falyx:
|
||||
context.start_timer()
|
||||
try:
|
||||
await self.hooks.trigger(HookType.BEFORE, context)
|
||||
print(args, kwargs)
|
||||
result = await selected_command(*args, **kwargs)
|
||||
context.result = result
|
||||
await self.hooks.trigger(HookType.ON_SUCCESS, context)
|
||||
@ -945,10 +975,11 @@ class Falyx:
|
||||
self.print_message(self.welcome_message)
|
||||
try:
|
||||
while True:
|
||||
if callable(self.render_menu):
|
||||
self.render_menu(self)
|
||||
else:
|
||||
self.console.print(self.table, justify="center")
|
||||
if not self.hide_menu_table:
|
||||
if callable(self.render_menu):
|
||||
self.render_menu(self)
|
||||
else:
|
||||
self.console.print(self.table, justify="center")
|
||||
try:
|
||||
task = asyncio.create_task(self.process_command())
|
||||
should_continue = await task
|
||||
@ -964,8 +995,6 @@ class Falyx:
|
||||
logger.info("BackSignal received.")
|
||||
except CancelSignal:
|
||||
logger.info("CancelSignal received.")
|
||||
except HelpSignal:
|
||||
logger.info("HelpSignal received.")
|
||||
finally:
|
||||
logger.info("Exiting menu: %s", self.get_title())
|
||||
if self.exit_message:
|
||||
@ -995,7 +1024,7 @@ class Falyx:
|
||||
sys.exit(0)
|
||||
|
||||
if self.cli_args.command == "version" or self.cli_args.version:
|
||||
self.console.print(f"[{OneColors.GREEN_b}]Falyx CLI v{__version__}[/]")
|
||||
self.console.print(f"[{OneColors.BLUE_b}]Falyx CLI v{__version__}[/]")
|
||||
sys.exit(0)
|
||||
|
||||
if self.cli_args.command == "preview":
|
||||
|
@ -4,7 +4,7 @@ from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
from enum import Enum
|
||||
from typing import Awaitable, Callable, Dict, List, Optional, Union
|
||||
from typing import Awaitable, Callable, Union
|
||||
|
||||
from falyx.context import ExecutionContext
|
||||
from falyx.logger import logger
|
||||
@ -24,7 +24,7 @@ class HookType(Enum):
|
||||
ON_TEARDOWN = "on_teardown"
|
||||
|
||||
@classmethod
|
||||
def choices(cls) -> List[HookType]:
|
||||
def choices(cls) -> list[HookType]:
|
||||
"""Return a list of all hook type choices."""
|
||||
return list(cls)
|
||||
|
||||
@ -37,16 +37,17 @@ class HookManager:
|
||||
"""HookManager"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._hooks: Dict[HookType, List[Hook]] = {
|
||||
self._hooks: dict[HookType, list[Hook]] = {
|
||||
hook_type: [] for hook_type in HookType
|
||||
}
|
||||
|
||||
def register(self, hook_type: HookType, hook: Hook):
|
||||
if hook_type not in HookType:
|
||||
raise ValueError(f"Unsupported hook type: {hook_type}")
|
||||
def register(self, hook_type: HookType | str, hook: Hook):
|
||||
"""Raises ValueError if the hook type is not supported."""
|
||||
if not isinstance(hook_type, HookType):
|
||||
hook_type = HookType(hook_type)
|
||||
self._hooks[hook_type].append(hook)
|
||||
|
||||
def clear(self, hook_type: Optional[HookType] = None):
|
||||
def clear(self, hook_type: HookType | None = None):
|
||||
if hook_type:
|
||||
self._hooks[hook_type] = []
|
||||
else:
|
||||
|
@ -33,7 +33,7 @@ class MenuOptionMap(CaseInsensitiveDict):
|
||||
and special signal entries like Quit and Back.
|
||||
"""
|
||||
|
||||
RESERVED_KEYS = {"Q", "B"}
|
||||
RESERVED_KEYS = {"B", "X"}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@ -49,14 +49,14 @@ class MenuOptionMap(CaseInsensitiveDict):
|
||||
def _inject_reserved_defaults(self):
|
||||
from falyx.action import SignalAction
|
||||
|
||||
self._add_reserved(
|
||||
"Q",
|
||||
MenuOption("Exit", SignalAction("Quit", QuitSignal()), OneColors.DARK_RED),
|
||||
)
|
||||
self._add_reserved(
|
||||
"B",
|
||||
MenuOption("Back", SignalAction("Back", BackSignal()), OneColors.DARK_YELLOW),
|
||||
)
|
||||
self._add_reserved(
|
||||
"X",
|
||||
MenuOption("Exit", SignalAction("Quit", QuitSignal()), OneColors.DARK_RED),
|
||||
)
|
||||
|
||||
def _add_reserved(self, key: str, option: MenuOption) -> None:
|
||||
"""Add a reserved key, bypassing validation."""
|
||||
@ -78,8 +78,20 @@ class MenuOptionMap(CaseInsensitiveDict):
|
||||
raise ValueError(f"Cannot delete reserved option '{key}'.")
|
||||
super().__delitem__(key)
|
||||
|
||||
def update(self, other=None, **kwargs):
|
||||
"""Update the selection options with another dictionary."""
|
||||
if other:
|
||||
for key, option in other.items():
|
||||
if not isinstance(option, MenuOption):
|
||||
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
|
||||
self[key] = option
|
||||
for key, option in kwargs.items():
|
||||
if not isinstance(option, MenuOption):
|
||||
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
|
||||
self[key] = option
|
||||
|
||||
def items(self, include_reserved: bool = True):
|
||||
for k, v in super().items():
|
||||
if not include_reserved and k in self.RESERVED_KEYS:
|
||||
for key, option in super().items():
|
||||
if not include_reserved and key in self.RESERVED_KEYS:
|
||||
continue
|
||||
yield k, v
|
||||
yield key, option
|
||||
|
0
falyx/parsers/.pytyped
Normal file
0
falyx/parsers/.pytyped
Normal file
17
falyx/parsers/__init__.py
Normal file
17
falyx/parsers/__init__.py
Normal file
@ -0,0 +1,17 @@
|
||||
"""
|
||||
Falyx CLI Framework
|
||||
|
||||
Copyright (c) 2025 rtj.dev LLC.
|
||||
Licensed under the MIT License. See LICENSE file for details.
|
||||
"""
|
||||
|
||||
from .argparse import Argument, ArgumentAction, CommandArgumentParser
|
||||
from .parsers import FalyxParsers, get_arg_parsers
|
||||
|
||||
__all__ = [
|
||||
"Argument",
|
||||
"ArgumentAction",
|
||||
"CommandArgumentParser",
|
||||
"get_arg_parsers",
|
||||
"FalyxParsers",
|
||||
]
|
@ -1,11 +1,14 @@
|
||||
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
|
||||
from __future__ import annotations
|
||||
|
||||
from copy import deepcopy
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Any, Iterable
|
||||
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
from rich.markup import escape
|
||||
from rich.text import Text
|
||||
|
||||
from falyx.exceptions import CommandArgumentError
|
||||
from falyx.signals import HelpSignal
|
||||
@ -22,6 +25,15 @@ class ArgumentAction(Enum):
|
||||
COUNT = "count"
|
||||
HELP = "help"
|
||||
|
||||
@classmethod
|
||||
def choices(cls) -> list[ArgumentAction]:
|
||||
"""Return a list of all argument actions."""
|
||||
return list(cls)
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""Return the string representation of the argument action."""
|
||||
return self.value
|
||||
|
||||
|
||||
@dataclass
|
||||
class Argument:
|
||||
@ -40,6 +52,74 @@ class Argument:
|
||||
nargs: int | str = 1 # int, '?', '*', '+'
|
||||
positional: bool = False # True if no leading - or -- in flags
|
||||
|
||||
def get_positional_text(self) -> str:
|
||||
"""Get the positional text for the argument."""
|
||||
text = ""
|
||||
if self.positional:
|
||||
if self.choices:
|
||||
text = f"{{{','.join([str(choice) for choice in self.choices])}}}"
|
||||
else:
|
||||
text = self.dest
|
||||
return text
|
||||
|
||||
def get_choice_text(self) -> str:
|
||||
"""Get the choice text for the argument."""
|
||||
choice_text = ""
|
||||
if self.choices:
|
||||
choice_text = f"{{{','.join([str(choice) for choice in self.choices])}}}"
|
||||
elif (
|
||||
self.action
|
||||
in (
|
||||
ArgumentAction.STORE,
|
||||
ArgumentAction.APPEND,
|
||||
ArgumentAction.EXTEND,
|
||||
)
|
||||
and not self.positional
|
||||
):
|
||||
choice_text = self.dest.upper()
|
||||
elif self.action in (
|
||||
ArgumentAction.STORE,
|
||||
ArgumentAction.APPEND,
|
||||
ArgumentAction.EXTEND,
|
||||
) or isinstance(self.nargs, str):
|
||||
choice_text = self.dest
|
||||
|
||||
if self.nargs == "?":
|
||||
choice_text = f"[{choice_text}]"
|
||||
elif self.nargs == "*":
|
||||
choice_text = f"[{choice_text} ...]"
|
||||
elif self.nargs == "+":
|
||||
choice_text = f"{choice_text} [{choice_text} ...]"
|
||||
return choice_text
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, Argument):
|
||||
return False
|
||||
return (
|
||||
self.flags == other.flags
|
||||
and self.dest == other.dest
|
||||
and self.action == other.action
|
||||
and self.type == other.type
|
||||
and self.choices == other.choices
|
||||
and self.required == other.required
|
||||
and self.nargs == other.nargs
|
||||
and self.positional == other.positional
|
||||
)
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash(
|
||||
(
|
||||
tuple(self.flags),
|
||||
self.dest,
|
||||
self.action,
|
||||
self.type,
|
||||
tuple(self.choices or []),
|
||||
self.required,
|
||||
self.nargs,
|
||||
self.positional,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class CommandArgumentParser:
|
||||
"""
|
||||
@ -61,10 +141,25 @@ class CommandArgumentParser:
|
||||
- Render Help using Rich library.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
command_key: str = "",
|
||||
command_description: str = "",
|
||||
command_style: str = "bold",
|
||||
help_text: str = "",
|
||||
help_epilogue: str = "",
|
||||
aliases: list[str] | None = None,
|
||||
) -> None:
|
||||
"""Initialize the CommandArgumentParser."""
|
||||
self.command_description: str = ""
|
||||
self.command_key: str = command_key
|
||||
self.command_description: str = command_description
|
||||
self.command_style: str = command_style
|
||||
self.help_text: str = help_text
|
||||
self.help_epilogue: str = help_epilogue
|
||||
self.aliases: list[str] = aliases or []
|
||||
self._arguments: list[Argument] = []
|
||||
self._positional: list[Argument] = []
|
||||
self._keyword: list[Argument] = []
|
||||
self._flag_map: dict[str, Argument] = {}
|
||||
self._dest_set: set[str] = set()
|
||||
self._add_help()
|
||||
@ -73,10 +168,10 @@ class CommandArgumentParser:
|
||||
def _add_help(self):
|
||||
"""Add help argument to the parser."""
|
||||
self.add_argument(
|
||||
"--help",
|
||||
"-h",
|
||||
"--help",
|
||||
action=ArgumentAction.HELP,
|
||||
help="Show this help message and exit.",
|
||||
help="Show this help message.",
|
||||
dest="help",
|
||||
)
|
||||
|
||||
@ -304,10 +399,31 @@ class CommandArgumentParser:
|
||||
)
|
||||
self._flag_map[flag] = argument
|
||||
self._arguments.append(argument)
|
||||
if positional:
|
||||
self._positional.append(argument)
|
||||
else:
|
||||
self._keyword.append(argument)
|
||||
|
||||
def get_argument(self, dest: str) -> Argument | None:
|
||||
return next((a for a in self._arguments if a.dest == dest), None)
|
||||
|
||||
def to_definition_list(self) -> list[dict[str, Any]]:
|
||||
defs = []
|
||||
for arg in self._arguments:
|
||||
defs.append(
|
||||
{
|
||||
"flags": arg.flags,
|
||||
"dest": arg.dest,
|
||||
"action": arg.action,
|
||||
"type": arg.type,
|
||||
"choices": arg.choices,
|
||||
"required": arg.required,
|
||||
"nargs": arg.nargs,
|
||||
"positional": arg.positional,
|
||||
}
|
||||
)
|
||||
return defs
|
||||
|
||||
def _consume_nargs(
|
||||
self, args: list[str], start: int, spec: Argument
|
||||
) -> tuple[list[str], int]:
|
||||
@ -405,7 +521,9 @@ class CommandArgumentParser:
|
||||
|
||||
return i
|
||||
|
||||
def parse_args(self, args: list[str] | None = None) -> dict[str, Any]:
|
||||
def parse_args(
|
||||
self, args: list[str] | None = None, from_validate: bool = False
|
||||
) -> dict[str, Any]:
|
||||
"""Parse Falyx Command arguments."""
|
||||
if args is None:
|
||||
args = []
|
||||
@ -423,7 +541,8 @@ class CommandArgumentParser:
|
||||
action = spec.action
|
||||
|
||||
if action == ArgumentAction.HELP:
|
||||
self.render_help()
|
||||
if not from_validate:
|
||||
self.render_help()
|
||||
raise HelpSignal()
|
||||
elif action == ArgumentAction.STORE_TRUE:
|
||||
result[spec.dest] = True
|
||||
@ -550,13 +669,15 @@ class CommandArgumentParser:
|
||||
result.pop("help", None)
|
||||
return result
|
||||
|
||||
def parse_args_split(self, args: list[str]) -> tuple[tuple[Any, ...], dict[str, Any]]:
|
||||
def parse_args_split(
|
||||
self, args: list[str], from_validate: bool = False
|
||||
) -> tuple[tuple[Any, ...], dict[str, Any]]:
|
||||
"""
|
||||
Returns:
|
||||
tuple[args, kwargs] - Positional arguments in defined order,
|
||||
followed by keyword argument mapping.
|
||||
"""
|
||||
parsed = self.parse_args(args)
|
||||
parsed = self.parse_args(args, from_validate)
|
||||
args_list = []
|
||||
kwargs_dict = {}
|
||||
for arg in self._arguments:
|
||||
@ -568,20 +689,74 @@ class CommandArgumentParser:
|
||||
kwargs_dict[arg.dest] = parsed[arg.dest]
|
||||
return tuple(args_list), kwargs_dict
|
||||
|
||||
def render_help(self):
|
||||
table = Table(title=f"{self.command_description} Help")
|
||||
table.add_column("Flags")
|
||||
table.add_column("Help")
|
||||
for arg in self._arguments:
|
||||
if arg.dest == "help":
|
||||
continue
|
||||
flag_str = ", ".join(arg.flags) if not arg.positional else arg.dest
|
||||
table.add_row(flag_str, arg.help or "")
|
||||
table.add_section()
|
||||
arg = self.get_argument("help")
|
||||
flag_str = ", ".join(arg.flags) if not arg.positional else arg.dest
|
||||
table.add_row(flag_str, arg.help or "")
|
||||
self.console.print(table)
|
||||
def render_help(self) -> None:
|
||||
# Options
|
||||
# Add all keyword arguments to the options list
|
||||
options_list = []
|
||||
for arg in self._keyword:
|
||||
choice_text = arg.get_choice_text()
|
||||
if choice_text:
|
||||
options_list.extend([f"[{arg.flags[0]} {choice_text}]"])
|
||||
else:
|
||||
options_list.extend([f"[{arg.flags[0]}]"])
|
||||
|
||||
# Add positional arguments to the options list
|
||||
for arg in self._positional:
|
||||
choice_text = arg.get_choice_text()
|
||||
if isinstance(arg.nargs, int):
|
||||
choice_text = " ".join([choice_text] * arg.nargs)
|
||||
options_list.append(escape(choice_text))
|
||||
|
||||
options_text = " ".join(options_list)
|
||||
command_keys = " | ".join(
|
||||
[f"[{self.command_style}]{self.command_key}[/{self.command_style}]"]
|
||||
+ [
|
||||
f"[{self.command_style}]{alias}[/{self.command_style}]"
|
||||
for alias in self.aliases
|
||||
]
|
||||
)
|
||||
|
||||
usage = f"usage: {command_keys} {options_text}"
|
||||
self.console.print(f"[bold]{usage}[/bold]\n")
|
||||
|
||||
# Description
|
||||
if self.help_text:
|
||||
self.console.print(self.help_text + "\n")
|
||||
|
||||
# Arguments
|
||||
if self._arguments:
|
||||
if self._positional:
|
||||
self.console.print("[bold]positional:[/bold]")
|
||||
for arg in self._positional:
|
||||
flags = arg.get_positional_text()
|
||||
arg_line = Text(f" {flags:<30} ")
|
||||
help_text = arg.help or ""
|
||||
arg_line.append(help_text)
|
||||
self.console.print(arg_line)
|
||||
self.console.print("[bold]options:[/bold]")
|
||||
for arg in self._keyword:
|
||||
flags = ", ".join(arg.flags)
|
||||
flags_choice = f"{flags} {arg.get_choice_text()}"
|
||||
arg_line = Text(f" {flags_choice:<30} ")
|
||||
help_text = arg.help or ""
|
||||
arg_line.append(help_text)
|
||||
self.console.print(arg_line)
|
||||
|
||||
# Epilogue
|
||||
if self.help_epilogue:
|
||||
self.console.print("\n" + self.help_epilogue, style="dim")
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, CommandArgumentParser):
|
||||
return False
|
||||
|
||||
def sorted_args(parser):
|
||||
return sorted(parser._arguments, key=lambda a: a.dest)
|
||||
|
||||
return sorted_args(self) == sorted_args(other)
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash(tuple(sorted(self._arguments, key=lambda a: a.dest)))
|
||||
|
||||
def __str__(self) -> str:
|
||||
positional = sum(arg.positional for arg in self._arguments)
|
@ -114,7 +114,7 @@ def get_arg_parsers(
|
||||
help="Skip confirmation prompts",
|
||||
)
|
||||
|
||||
run_group.add_argument(
|
||||
run_parser.add_argument(
|
||||
"command_args",
|
||||
nargs=REMAINDER,
|
||||
help="Arguments to pass to the command (if applicable)",
|
74
falyx/parsers/signature.py
Normal file
74
falyx/parsers/signature.py
Normal file
@ -0,0 +1,74 @@
|
||||
import inspect
|
||||
from typing import Any, Callable
|
||||
|
||||
from falyx.logger import logger
|
||||
|
||||
|
||||
def infer_args_from_func(
|
||||
func: Callable[[Any], Any] | None,
|
||||
arg_metadata: dict[str, str | dict[str, Any]] | None = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""
|
||||
Infer argument definitions from a callable's signature.
|
||||
Returns a list of kwargs suitable for CommandArgumentParser.add_argument.
|
||||
"""
|
||||
if not callable(func):
|
||||
logger.debug("Provided argument is not callable: %s", func)
|
||||
return []
|
||||
arg_metadata = arg_metadata or {}
|
||||
signature = inspect.signature(func)
|
||||
arg_defs = []
|
||||
|
||||
for name, param in signature.parameters.items():
|
||||
raw_metadata = arg_metadata.get(name, {})
|
||||
metadata = (
|
||||
{"help": raw_metadata} if isinstance(raw_metadata, str) else raw_metadata
|
||||
)
|
||||
|
||||
if param.kind not in (
|
||||
inspect.Parameter.POSITIONAL_ONLY,
|
||||
inspect.Parameter.POSITIONAL_OR_KEYWORD,
|
||||
inspect.Parameter.KEYWORD_ONLY,
|
||||
):
|
||||
continue
|
||||
|
||||
arg_type = (
|
||||
param.annotation if param.annotation is not inspect.Parameter.empty else str
|
||||
)
|
||||
default = param.default if param.default is not inspect.Parameter.empty else None
|
||||
is_required = param.default is inspect.Parameter.empty
|
||||
if is_required:
|
||||
flags = [f"{name.replace('_', '-')}"]
|
||||
else:
|
||||
flags = [f"--{name.replace('_', '-')}"]
|
||||
action = "store"
|
||||
nargs: int | str = 1
|
||||
|
||||
if arg_type is bool:
|
||||
if param.default is False:
|
||||
action = "store_true"
|
||||
else:
|
||||
action = "store_false"
|
||||
|
||||
if arg_type is list:
|
||||
action = "append"
|
||||
if is_required:
|
||||
nargs = "+"
|
||||
else:
|
||||
nargs = "*"
|
||||
|
||||
arg_defs.append(
|
||||
{
|
||||
"flags": flags,
|
||||
"dest": name,
|
||||
"type": arg_type,
|
||||
"default": default,
|
||||
"required": is_required,
|
||||
"nargs": nargs,
|
||||
"action": action,
|
||||
"help": metadata.get("help", ""),
|
||||
"choices": metadata.get("choices"),
|
||||
}
|
||||
)
|
||||
|
||||
return arg_defs
|
28
falyx/parsers/utils.py
Normal file
28
falyx/parsers/utils.py
Normal file
@ -0,0 +1,28 @@
|
||||
from typing import Any
|
||||
|
||||
from falyx import logger
|
||||
from falyx.parsers.signature import infer_args_from_func
|
||||
|
||||
|
||||
def same_argument_definitions(
|
||||
actions: list[Any],
|
||||
arg_metadata: dict[str, str | dict[str, Any]] | None = None,
|
||||
) -> list[dict[str, Any]] | None:
|
||||
from falyx.action.action import BaseAction
|
||||
|
||||
arg_sets = []
|
||||
for action in actions:
|
||||
if isinstance(action, BaseAction):
|
||||
infer_target, _ = action.get_infer_target()
|
||||
arg_defs = infer_args_from_func(infer_target, arg_metadata)
|
||||
elif callable(action):
|
||||
arg_defs = infer_args_from_func(action, arg_metadata)
|
||||
else:
|
||||
logger.debug("Auto args unsupported for action: %s", action)
|
||||
return None
|
||||
arg_sets.append(arg_defs)
|
||||
|
||||
first = arg_sets[0]
|
||||
if all(arg_set == first for arg_set in arg_sets[1:]):
|
||||
return first
|
||||
return None
|
@ -10,7 +10,7 @@ from rich.markup import escape
|
||||
from rich.table import Table
|
||||
|
||||
from falyx.themes import OneColors
|
||||
from falyx.utils import chunks
|
||||
from falyx.utils import CaseInsensitiveDict, chunks
|
||||
from falyx.validators import int_range_validator, key_validator
|
||||
|
||||
|
||||
@ -32,6 +32,62 @@ class SelectionOption:
|
||||
return f"[{OneColors.WHITE}]{key}[/] [{self.style}]{self.description}[/]"
|
||||
|
||||
|
||||
class SelectionOptionMap(CaseInsensitiveDict):
|
||||
"""
|
||||
Manages selection options including validation and reserved key protection.
|
||||
"""
|
||||
|
||||
RESERVED_KEYS: set[str] = set()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
options: dict[str, SelectionOption] | None = None,
|
||||
allow_reserved: bool = False,
|
||||
):
|
||||
super().__init__()
|
||||
self.allow_reserved = allow_reserved
|
||||
if options:
|
||||
self.update(options)
|
||||
|
||||
def _add_reserved(self, key: str, option: SelectionOption) -> None:
|
||||
"""Add a reserved key, bypassing validation."""
|
||||
norm_key = key.upper()
|
||||
super().__setitem__(norm_key, option)
|
||||
|
||||
def __setitem__(self, key: str, option: SelectionOption) -> None:
|
||||
if not isinstance(option, SelectionOption):
|
||||
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
|
||||
norm_key = key.upper()
|
||||
if norm_key in self.RESERVED_KEYS and not self.allow_reserved:
|
||||
raise ValueError(
|
||||
f"Key '{key}' is reserved and cannot be used in SelectionOptionMap."
|
||||
)
|
||||
super().__setitem__(norm_key, option)
|
||||
|
||||
def __delitem__(self, key: str) -> None:
|
||||
if key.upper() in self.RESERVED_KEYS and not self.allow_reserved:
|
||||
raise ValueError(f"Cannot delete reserved option '{key}'.")
|
||||
super().__delitem__(key)
|
||||
|
||||
def update(self, other=None, **kwargs):
|
||||
"""Update the selection options with another dictionary."""
|
||||
if other:
|
||||
for key, option in other.items():
|
||||
if not isinstance(option, SelectionOption):
|
||||
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
|
||||
self[key] = option
|
||||
for key, option in kwargs.items():
|
||||
if not isinstance(option, SelectionOption):
|
||||
raise TypeError(f"Value for key '{key}' must be a SelectionOption.")
|
||||
self[key] = option
|
||||
|
||||
def items(self, include_reserved: bool = True):
|
||||
for k, v in super().items():
|
||||
if not include_reserved and k in self.RESERVED_KEYS:
|
||||
continue
|
||||
yield k, v
|
||||
|
||||
|
||||
def render_table_base(
|
||||
title: str,
|
||||
*,
|
||||
|
@ -1 +1 @@
|
||||
__version__ = "0.1.28"
|
||||
__version__ = "0.1.33"
|
||||
|
@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "falyx"
|
||||
version = "0.1.28"
|
||||
version = "0.1.33"
|
||||
description = "Reliable and introspectable async CLI action framework."
|
||||
authors = ["Roland Thomas Jr <roland@rtj.dev>"]
|
||||
license = "MIT"
|
||||
|
@ -56,102 +56,6 @@ def test_command_str():
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"action_factory, expected_requires_input",
|
||||
[
|
||||
(lambda: Action(name="normal", action=dummy_action), False),
|
||||
(lambda: DummyInputAction(name="io"), True),
|
||||
(
|
||||
lambda: ChainedAction(name="chain", actions=[DummyInputAction(name="io")]),
|
||||
True,
|
||||
),
|
||||
(
|
||||
lambda: ActionGroup(name="group", actions=[DummyInputAction(name="io")]),
|
||||
True,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_command_requires_input_detection(action_factory, expected_requires_input):
|
||||
action = action_factory()
|
||||
cmd = Command(key="TEST", description="Test Command", action=action)
|
||||
|
||||
assert cmd.requires_input == expected_requires_input
|
||||
if expected_requires_input:
|
||||
assert cmd.hidden is True
|
||||
else:
|
||||
assert cmd.hidden is False
|
||||
|
||||
|
||||
def test_requires_input_flag_detected_for_baseioaction():
|
||||
"""Command should automatically detect requires_input=True for BaseIOAction."""
|
||||
cmd = Command(
|
||||
key="X",
|
||||
description="Echo input",
|
||||
action=DummyInputAction(name="dummy"),
|
||||
)
|
||||
assert cmd.requires_input is True
|
||||
assert cmd.hidden is True
|
||||
|
||||
|
||||
def test_requires_input_manual_override():
|
||||
"""Command manually set requires_input=False should not auto-hide."""
|
||||
cmd = Command(
|
||||
key="Y",
|
||||
description="Custom input command",
|
||||
action=DummyInputAction(name="dummy"),
|
||||
requires_input=False,
|
||||
)
|
||||
assert cmd.requires_input is False
|
||||
assert cmd.hidden is False
|
||||
|
||||
|
||||
def test_default_command_does_not_require_input():
|
||||
"""Normal Command without IO Action should not require input."""
|
||||
cmd = Command(
|
||||
key="Z",
|
||||
description="Simple action",
|
||||
action=lambda: 42,
|
||||
)
|
||||
assert cmd.requires_input is False
|
||||
assert cmd.hidden is False
|
||||
|
||||
|
||||
def test_chain_requires_input():
|
||||
"""If first action in a chain requires input, the command should require input."""
|
||||
chain = ChainedAction(
|
||||
name="ChainWithInput",
|
||||
actions=[
|
||||
DummyInputAction(name="dummy"),
|
||||
Action(name="action1", action=lambda: 1),
|
||||
],
|
||||
)
|
||||
cmd = Command(
|
||||
key="A",
|
||||
description="Chain with input",
|
||||
action=chain,
|
||||
)
|
||||
assert cmd.requires_input is True
|
||||
assert cmd.hidden is True
|
||||
|
||||
|
||||
def test_group_requires_input():
|
||||
"""If any action in a group requires input, the command should require input."""
|
||||
group = ActionGroup(
|
||||
name="GroupWithInput",
|
||||
actions=[
|
||||
Action(name="action1", action=lambda: 1),
|
||||
DummyInputAction(name="dummy"),
|
||||
],
|
||||
)
|
||||
cmd = Command(
|
||||
key="B",
|
||||
description="Group with input",
|
||||
action=group,
|
||||
)
|
||||
assert cmd.requires_input is True
|
||||
assert cmd.hidden is True
|
||||
|
||||
|
||||
def test_enable_retry():
|
||||
"""Command should enable retry if action is an Action and retry is set to True."""
|
||||
cmd = Command(
|
||||
|
@ -1,7 +1,7 @@
|
||||
import pytest
|
||||
|
||||
from falyx.argparse import ArgumentAction, CommandArgumentParser
|
||||
from falyx.exceptions import CommandArgumentError
|
||||
from falyx.parsers import ArgumentAction, CommandArgumentParser
|
||||
from falyx.signals import HelpSignal
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user