Compare commits
3 Commits
b14004c989
...
argparse-i
Author | SHA1 | Date | |
---|---|---|---|
afa47b0bac
|
|||
70a527358d | |||
62276debd5
|
40
examples/auto_args_group.py
Normal file
40
examples/auto_args_group.py
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
|
from falyx import Action, ActionGroup, Command, Falyx
|
||||||
|
|
||||||
|
|
||||||
|
# Define a shared async function
|
||||||
|
async def say_hello(name: str, excited: bool = False):
|
||||||
|
if excited:
|
||||||
|
print(f"Hello, {name}!!!")
|
||||||
|
else:
|
||||||
|
print(f"Hello, {name}.")
|
||||||
|
|
||||||
|
|
||||||
|
# Wrap the same callable in multiple Actions
|
||||||
|
action1 = Action("say_hello_1", action=say_hello)
|
||||||
|
action2 = Action("say_hello_2", action=say_hello)
|
||||||
|
action3 = Action("say_hello_3", action=say_hello)
|
||||||
|
|
||||||
|
# Combine into an ActionGroup
|
||||||
|
group = ActionGroup(name="greet_group", actions=[action1, action2, action3])
|
||||||
|
|
||||||
|
# Create the Command with auto_args=True
|
||||||
|
cmd = Command(
|
||||||
|
key="G",
|
||||||
|
description="Greet someone with multiple variations.",
|
||||||
|
action=group,
|
||||||
|
auto_args=True,
|
||||||
|
arg_metadata={
|
||||||
|
"name": {
|
||||||
|
"help": "The name of the person to greet.",
|
||||||
|
},
|
||||||
|
"excited": {
|
||||||
|
"help": "Whether to greet excitedly.",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
flx = Falyx("Test Group")
|
||||||
|
flx.add_command_from_command(cmd)
|
||||||
|
asyncio.run(flx.run())
|
32
examples/auto_parse_demo.py
Normal file
32
examples/auto_parse_demo.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
|
from falyx import Action, Falyx
|
||||||
|
|
||||||
|
|
||||||
|
async def deploy(service: str, region: str = "us-east-1", verbose: bool = False):
|
||||||
|
if verbose:
|
||||||
|
print(f"Deploying {service} to {region}...")
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
if verbose:
|
||||||
|
print(f"{service} deployed successfully!")
|
||||||
|
|
||||||
|
|
||||||
|
flx = Falyx("Deployment CLI")
|
||||||
|
|
||||||
|
flx.add_command(
|
||||||
|
key="D",
|
||||||
|
aliases=["deploy"],
|
||||||
|
description="Deploy a service to a specified region.",
|
||||||
|
action=Action(
|
||||||
|
name="deploy_service",
|
||||||
|
action=deploy,
|
||||||
|
),
|
||||||
|
auto_args=True,
|
||||||
|
arg_metadata={
|
||||||
|
"service": "Service name",
|
||||||
|
"region": {"help": "Deployment region", "choices": ["us-east-1", "us-west-2"]},
|
||||||
|
"verbose": {"help": "Enable verbose mode"},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
asyncio.run(flx.run())
|
@ -8,9 +8,9 @@ setup_logging()
|
|||||||
|
|
||||||
|
|
||||||
# A flaky async step that fails randomly
|
# A flaky async step that fails randomly
|
||||||
async def flaky_step():
|
async def flaky_step() -> str:
|
||||||
await asyncio.sleep(0.2)
|
await asyncio.sleep(0.2)
|
||||||
if random.random() < 0.5:
|
if random.random() < 0.3:
|
||||||
raise RuntimeError("Random failure!")
|
raise RuntimeError("Random failure!")
|
||||||
print("Flaky step succeeded!")
|
print("Flaky step succeeded!")
|
||||||
return "ok"
|
return "ok"
|
||||||
|
0
falyx/action/.pytyped
Normal file
0
falyx/action/.pytyped
Normal file
@ -224,7 +224,10 @@ class ShellAction(BaseIOAction):
|
|||||||
# Replace placeholder in template, or use raw input as full command
|
# Replace placeholder in template, or use raw input as full command
|
||||||
command = self.command_template.format(parsed_input)
|
command = self.command_template.format(parsed_input)
|
||||||
if self.safe_mode:
|
if self.safe_mode:
|
||||||
args = shlex.split(command)
|
try:
|
||||||
|
args = shlex.split(command)
|
||||||
|
except ValueError as error:
|
||||||
|
raise FalyxError(f"Invalid command template: {error}")
|
||||||
result = subprocess.run(args, capture_output=True, text=True, check=True)
|
result = subprocess.run(args, capture_output=True, text=True, check=True)
|
||||||
else:
|
else:
|
||||||
result = subprocess.run(
|
result = subprocess.run(
|
||||||
|
102
falyx/command.py
102
falyx/command.py
@ -18,6 +18,7 @@ in building robust interactive menus.
|
|||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import shlex
|
||||||
from functools import cached_property
|
from functools import cached_property
|
||||||
from typing import Any, Callable
|
from typing import Any, Callable
|
||||||
|
|
||||||
@ -26,7 +27,13 @@ from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator
|
|||||||
from rich.console import Console
|
from rich.console import Console
|
||||||
from rich.tree import Tree
|
from rich.tree import Tree
|
||||||
|
|
||||||
from falyx.action.action import Action, ActionGroup, BaseAction, ChainedAction
|
from falyx.action.action import (
|
||||||
|
Action,
|
||||||
|
ActionGroup,
|
||||||
|
BaseAction,
|
||||||
|
ChainedAction,
|
||||||
|
ProcessAction,
|
||||||
|
)
|
||||||
from falyx.action.io_action import BaseIOAction
|
from falyx.action.io_action import BaseIOAction
|
||||||
from falyx.context import ExecutionContext
|
from falyx.context import ExecutionContext
|
||||||
from falyx.debug import register_debug_hooks
|
from falyx.debug import register_debug_hooks
|
||||||
@ -34,7 +41,13 @@ from falyx.execution_registry import ExecutionRegistry as er
|
|||||||
from falyx.hook_manager import HookManager, HookType
|
from falyx.hook_manager import HookManager, HookType
|
||||||
from falyx.logger import logger
|
from falyx.logger import logger
|
||||||
from falyx.options_manager import OptionsManager
|
from falyx.options_manager import OptionsManager
|
||||||
|
from falyx.parsers import (
|
||||||
|
CommandArgumentParser,
|
||||||
|
infer_args_from_func,
|
||||||
|
same_argument_definitions,
|
||||||
|
)
|
||||||
from falyx.prompt_utils import confirm_async, should_prompt_user
|
from falyx.prompt_utils import confirm_async, should_prompt_user
|
||||||
|
from falyx.protocols import ArgParserProtocol
|
||||||
from falyx.retry import RetryPolicy
|
from falyx.retry import RetryPolicy
|
||||||
from falyx.retry_utils import enable_retries_recursively
|
from falyx.retry_utils import enable_retries_recursively
|
||||||
from falyx.signals import CancelSignal
|
from falyx.signals import CancelSignal
|
||||||
@ -87,6 +100,11 @@ class Command(BaseModel):
|
|||||||
tags (list[str]): Organizational tags for the command.
|
tags (list[str]): Organizational tags for the command.
|
||||||
logging_hooks (bool): Whether to attach logging hooks automatically.
|
logging_hooks (bool): Whether to attach logging hooks automatically.
|
||||||
requires_input (bool | None): Indicates if the action needs input.
|
requires_input (bool | None): Indicates if the action needs input.
|
||||||
|
options_manager (OptionsManager): Manages global command-line options.
|
||||||
|
arg_parser (CommandArgumentParser): Parses command arguments.
|
||||||
|
custom_parser (ArgParserProtocol | None): Custom argument parser.
|
||||||
|
custom_help (Callable[[], str | None] | None): Custom help message generator.
|
||||||
|
auto_args (bool): Automatically infer arguments from the action.
|
||||||
|
|
||||||
Methods:
|
Methods:
|
||||||
__call__(): Executes the command, respecting hooks and retries.
|
__call__(): Executes the command, respecting hooks and retries.
|
||||||
@ -98,12 +116,13 @@ class Command(BaseModel):
|
|||||||
|
|
||||||
key: str
|
key: str
|
||||||
description: str
|
description: str
|
||||||
action: BaseAction | Callable[[], Any]
|
action: BaseAction | Callable[[Any], Any]
|
||||||
args: tuple = ()
|
args: tuple = ()
|
||||||
kwargs: dict[str, Any] = Field(default_factory=dict)
|
kwargs: dict[str, Any] = Field(default_factory=dict)
|
||||||
hidden: bool = False
|
hidden: bool = False
|
||||||
aliases: list[str] = Field(default_factory=list)
|
aliases: list[str] = Field(default_factory=list)
|
||||||
help_text: str = ""
|
help_text: str = ""
|
||||||
|
help_epilogue: str = ""
|
||||||
style: str = OneColors.WHITE
|
style: str = OneColors.WHITE
|
||||||
confirm: bool = False
|
confirm: bool = False
|
||||||
confirm_message: str = "Are you sure?"
|
confirm_message: str = "Are you sure?"
|
||||||
@ -121,11 +140,46 @@ class Command(BaseModel):
|
|||||||
logging_hooks: bool = False
|
logging_hooks: bool = False
|
||||||
requires_input: bool | None = None
|
requires_input: bool | None = None
|
||||||
options_manager: OptionsManager = Field(default_factory=OptionsManager)
|
options_manager: OptionsManager = Field(default_factory=OptionsManager)
|
||||||
|
arg_parser: CommandArgumentParser = Field(default_factory=CommandArgumentParser)
|
||||||
|
arguments: list[dict[str, Any]] = Field(default_factory=list)
|
||||||
|
argument_config: Callable[[CommandArgumentParser], None] | None = None
|
||||||
|
custom_parser: ArgParserProtocol | None = None
|
||||||
|
custom_help: Callable[[], str | None] | None = None
|
||||||
|
auto_args: bool = False
|
||||||
|
arg_metadata: dict[str, str | dict[str, Any]] = Field(default_factory=dict)
|
||||||
|
|
||||||
_context: ExecutionContext | None = PrivateAttr(default=None)
|
_context: ExecutionContext | None = PrivateAttr(default=None)
|
||||||
|
|
||||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||||
|
|
||||||
|
def parse_args(
|
||||||
|
self, raw_args: list[str] | str, from_validate: bool = False
|
||||||
|
) -> tuple[tuple, dict]:
|
||||||
|
if self.custom_parser:
|
||||||
|
if isinstance(raw_args, str):
|
||||||
|
try:
|
||||||
|
raw_args = shlex.split(raw_args)
|
||||||
|
except ValueError:
|
||||||
|
logger.warning(
|
||||||
|
"[Command:%s] Failed to split arguments: %s",
|
||||||
|
self.key,
|
||||||
|
raw_args,
|
||||||
|
)
|
||||||
|
return ((), {})
|
||||||
|
return self.custom_parser(raw_args)
|
||||||
|
|
||||||
|
if isinstance(raw_args, str):
|
||||||
|
try:
|
||||||
|
raw_args = shlex.split(raw_args)
|
||||||
|
except ValueError:
|
||||||
|
logger.warning(
|
||||||
|
"[Command:%s] Failed to split arguments: %s",
|
||||||
|
self.key,
|
||||||
|
raw_args,
|
||||||
|
)
|
||||||
|
return ((), {})
|
||||||
|
return self.arg_parser.parse_args_split(raw_args, from_validate=from_validate)
|
||||||
|
|
||||||
@field_validator("action", mode="before")
|
@field_validator("action", mode="before")
|
||||||
@classmethod
|
@classmethod
|
||||||
def wrap_callable_as_async(cls, action: Any) -> Any:
|
def wrap_callable_as_async(cls, action: Any) -> Any:
|
||||||
@ -135,6 +189,35 @@ class Command(BaseModel):
|
|||||||
return ensure_async(action)
|
return ensure_async(action)
|
||||||
raise TypeError("Action must be a callable or an instance of BaseAction")
|
raise TypeError("Action must be a callable or an instance of BaseAction")
|
||||||
|
|
||||||
|
def get_argument_definitions(self) -> list[dict[str, Any]]:
|
||||||
|
if self.arguments:
|
||||||
|
return self.arguments
|
||||||
|
elif self.argument_config:
|
||||||
|
self.argument_config(self.arg_parser)
|
||||||
|
elif self.auto_args:
|
||||||
|
if isinstance(self.action, (Action, ProcessAction)):
|
||||||
|
return infer_args_from_func(self.action.action, self.arg_metadata)
|
||||||
|
elif isinstance(self.action, ChainedAction):
|
||||||
|
if self.action.actions:
|
||||||
|
action = self.action.actions[0]
|
||||||
|
if isinstance(action, Action):
|
||||||
|
return infer_args_from_func(action.action, self.arg_metadata)
|
||||||
|
elif callable(action):
|
||||||
|
return infer_args_from_func(action, self.arg_metadata)
|
||||||
|
elif isinstance(self.action, ActionGroup):
|
||||||
|
arg_defs = same_argument_definitions(
|
||||||
|
self.action.actions, self.arg_metadata
|
||||||
|
)
|
||||||
|
if arg_defs:
|
||||||
|
return arg_defs
|
||||||
|
logger.debug(
|
||||||
|
"[Command:%s] auto_args disabled: mismatched ActionGroup arguments",
|
||||||
|
self.key,
|
||||||
|
)
|
||||||
|
elif callable(self.action):
|
||||||
|
return infer_args_from_func(self.action, self.arg_metadata)
|
||||||
|
return []
|
||||||
|
|
||||||
def model_post_init(self, _: Any) -> None:
|
def model_post_init(self, _: Any) -> None:
|
||||||
"""Post-initialization to set up the action and hooks."""
|
"""Post-initialization to set up the action and hooks."""
|
||||||
if self.retry and isinstance(self.action, Action):
|
if self.retry and isinstance(self.action, Action):
|
||||||
@ -164,6 +247,9 @@ class Command(BaseModel):
|
|||||||
elif self.requires_input is None:
|
elif self.requires_input is None:
|
||||||
self.requires_input = False
|
self.requires_input = False
|
||||||
|
|
||||||
|
for arg_def in self.get_argument_definitions():
|
||||||
|
self.arg_parser.add_argument(*arg_def.pop("flags"), **arg_def)
|
||||||
|
|
||||||
@cached_property
|
@cached_property
|
||||||
def detect_requires_input(self) -> bool:
|
def detect_requires_input(self) -> bool:
|
||||||
"""Detect if the action requires input based on its type."""
|
"""Detect if the action requires input based on its type."""
|
||||||
@ -269,6 +355,18 @@ class Command(BaseModel):
|
|||||||
if self._context:
|
if self._context:
|
||||||
self._context.log_summary()
|
self._context.log_summary()
|
||||||
|
|
||||||
|
def show_help(self) -> bool:
|
||||||
|
"""Display the help message for the command."""
|
||||||
|
if self.custom_help:
|
||||||
|
output = self.custom_help()
|
||||||
|
if output:
|
||||||
|
console.print(output)
|
||||||
|
return True
|
||||||
|
if isinstance(self.arg_parser, CommandArgumentParser):
|
||||||
|
self.arg_parser.render_help()
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
async def preview(self) -> None:
|
async def preview(self) -> None:
|
||||||
label = f"[{OneColors.GREEN_b}]Command:[/] '{self.key}' — {self.description}"
|
label = f"[{OneColors.GREEN_b}]Command:[/] '{self.key}' — {self.description}"
|
||||||
|
|
||||||
|
@ -28,3 +28,7 @@ class CircuitBreakerOpen(FalyxError):
|
|||||||
|
|
||||||
class EmptyChainError(FalyxError):
|
class EmptyChainError(FalyxError):
|
||||||
"""Exception raised when the chain is empty."""
|
"""Exception raised when the chain is empty."""
|
||||||
|
|
||||||
|
|
||||||
|
class CommandArgumentError(FalyxError):
|
||||||
|
"""Exception raised when there is an error in the command argument parser."""
|
||||||
|
161
falyx/falyx.py
161
falyx/falyx.py
@ -23,6 +23,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import shlex
|
||||||
import sys
|
import sys
|
||||||
from argparse import Namespace
|
from argparse import Namespace
|
||||||
from difflib import get_close_matches
|
from difflib import get_close_matches
|
||||||
@ -34,7 +35,8 @@ from prompt_toolkit import PromptSession
|
|||||||
from prompt_toolkit.completion import WordCompleter
|
from prompt_toolkit.completion import WordCompleter
|
||||||
from prompt_toolkit.formatted_text import AnyFormattedText
|
from prompt_toolkit.formatted_text import AnyFormattedText
|
||||||
from prompt_toolkit.key_binding import KeyBindings
|
from prompt_toolkit.key_binding import KeyBindings
|
||||||
from prompt_toolkit.validation import Validator
|
from prompt_toolkit.patch_stdout import patch_stdout
|
||||||
|
from prompt_toolkit.validation import ValidationError, Validator
|
||||||
from rich import box
|
from rich import box
|
||||||
from rich.console import Console
|
from rich.console import Console
|
||||||
from rich.markdown import Markdown
|
from rich.markdown import Markdown
|
||||||
@ -47,6 +49,7 @@ from falyx.context import ExecutionContext
|
|||||||
from falyx.debug import log_after, log_before, log_error, log_success
|
from falyx.debug import log_after, log_before, log_error, log_success
|
||||||
from falyx.exceptions import (
|
from falyx.exceptions import (
|
||||||
CommandAlreadyExistsError,
|
CommandAlreadyExistsError,
|
||||||
|
CommandArgumentError,
|
||||||
FalyxError,
|
FalyxError,
|
||||||
InvalidActionError,
|
InvalidActionError,
|
||||||
NotAFalyxError,
|
NotAFalyxError,
|
||||||
@ -55,21 +58,42 @@ from falyx.execution_registry import ExecutionRegistry as er
|
|||||||
from falyx.hook_manager import Hook, HookManager, HookType
|
from falyx.hook_manager import Hook, HookManager, HookType
|
||||||
from falyx.logger import logger
|
from falyx.logger import logger
|
||||||
from falyx.options_manager import OptionsManager
|
from falyx.options_manager import OptionsManager
|
||||||
from falyx.parsers import get_arg_parsers
|
from falyx.parsers import CommandArgumentParser, get_arg_parsers
|
||||||
|
from falyx.protocols import ArgParserProtocol
|
||||||
from falyx.retry import RetryPolicy
|
from falyx.retry import RetryPolicy
|
||||||
from falyx.signals import BackSignal, CancelSignal, QuitSignal
|
from falyx.signals import BackSignal, CancelSignal, FlowSignal, HelpSignal, QuitSignal
|
||||||
from falyx.themes import OneColors, get_nord_theme
|
from falyx.themes import OneColors, get_nord_theme
|
||||||
from falyx.utils import CaseInsensitiveDict, _noop, chunks, get_program_invocation
|
from falyx.utils import CaseInsensitiveDict, _noop, chunks, get_program_invocation
|
||||||
from falyx.version import __version__
|
from falyx.version import __version__
|
||||||
|
|
||||||
|
|
||||||
class FalyxMode(str, Enum):
|
class FalyxMode(Enum):
|
||||||
MENU = "menu"
|
MENU = "menu"
|
||||||
RUN = "run"
|
RUN = "run"
|
||||||
PREVIEW = "preview"
|
PREVIEW = "preview"
|
||||||
RUN_ALL = "run-all"
|
RUN_ALL = "run-all"
|
||||||
|
|
||||||
|
|
||||||
|
class CommandValidator(Validator):
|
||||||
|
"""Validator to check if the input is a valid command or toggle key."""
|
||||||
|
|
||||||
|
def __init__(self, falyx: Falyx, error_message: str) -> None:
|
||||||
|
super().__init__()
|
||||||
|
self.falyx = falyx
|
||||||
|
self.error_message = error_message
|
||||||
|
|
||||||
|
def validate(self, document) -> None:
|
||||||
|
text = document.text
|
||||||
|
is_preview, choice, _, __ = self.falyx.get_command(text, from_validate=True)
|
||||||
|
if is_preview:
|
||||||
|
return None
|
||||||
|
if not choice:
|
||||||
|
raise ValidationError(
|
||||||
|
message=self.error_message,
|
||||||
|
cursor_position=document.get_end_of_document_position(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Falyx:
|
class Falyx:
|
||||||
"""
|
"""
|
||||||
Main menu controller for Falyx CLI applications.
|
Main menu controller for Falyx CLI applications.
|
||||||
@ -325,7 +349,7 @@ class Falyx:
|
|||||||
keys.extend(cmd.aliases)
|
keys.extend(cmd.aliases)
|
||||||
return WordCompleter(keys, ignore_case=True)
|
return WordCompleter(keys, ignore_case=True)
|
||||||
|
|
||||||
def _get_validator(self) -> Validator:
|
def _get_validator_error_message(self) -> str:
|
||||||
"""Validator to check if the input is a valid command or toggle key."""
|
"""Validator to check if the input is a valid command or toggle key."""
|
||||||
keys = {self.exit_command.key.upper()}
|
keys = {self.exit_command.key.upper()}
|
||||||
keys.update({alias.upper() for alias in self.exit_command.aliases})
|
keys.update({alias.upper() for alias in self.exit_command.aliases})
|
||||||
@ -354,18 +378,7 @@ class Falyx:
|
|||||||
if toggle_keys:
|
if toggle_keys:
|
||||||
message_lines.append(f" Toggles: {toggles_str}")
|
message_lines.append(f" Toggles: {toggles_str}")
|
||||||
error_message = " ".join(message_lines)
|
error_message = " ".join(message_lines)
|
||||||
|
return error_message
|
||||||
def validator(text):
|
|
||||||
is_preview, choice = self.get_command(text, from_validate=True)
|
|
||||||
if is_preview and choice is None:
|
|
||||||
return True
|
|
||||||
return bool(choice)
|
|
||||||
|
|
||||||
return Validator.from_callable(
|
|
||||||
validator,
|
|
||||||
error_message=error_message,
|
|
||||||
move_cursor_to_end=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _invalidate_prompt_session_cache(self):
|
def _invalidate_prompt_session_cache(self):
|
||||||
"""Forces the prompt session to be recreated on the next access."""
|
"""Forces the prompt session to be recreated on the next access."""
|
||||||
@ -428,9 +441,11 @@ class Falyx:
|
|||||||
multiline=False,
|
multiline=False,
|
||||||
completer=self._get_completer(),
|
completer=self._get_completer(),
|
||||||
reserve_space_for_menu=1,
|
reserve_space_for_menu=1,
|
||||||
validator=self._get_validator(),
|
validator=CommandValidator(self, self._get_validator_error_message()),
|
||||||
bottom_toolbar=self._get_bottom_bar_render(),
|
bottom_toolbar=self._get_bottom_bar_render(),
|
||||||
key_bindings=self.key_bindings,
|
key_bindings=self.key_bindings,
|
||||||
|
validate_while_typing=False,
|
||||||
|
interrupt_exception=FlowSignal,
|
||||||
)
|
)
|
||||||
return self._prompt_session
|
return self._prompt_session
|
||||||
|
|
||||||
@ -511,7 +526,7 @@ class Falyx:
|
|||||||
key: str = "X",
|
key: str = "X",
|
||||||
description: str = "Exit",
|
description: str = "Exit",
|
||||||
aliases: list[str] | None = None,
|
aliases: list[str] | None = None,
|
||||||
action: Callable[[], Any] | None = None,
|
action: Callable[[Any], Any] | None = None,
|
||||||
style: str = OneColors.DARK_RED,
|
style: str = OneColors.DARK_RED,
|
||||||
confirm: bool = False,
|
confirm: bool = False,
|
||||||
confirm_message: str = "Are you sure?",
|
confirm_message: str = "Are you sure?",
|
||||||
@ -565,13 +580,14 @@ class Falyx:
|
|||||||
self,
|
self,
|
||||||
key: str,
|
key: str,
|
||||||
description: str,
|
description: str,
|
||||||
action: BaseAction | Callable[[], Any],
|
action: BaseAction | Callable[[Any], Any],
|
||||||
*,
|
*,
|
||||||
args: tuple = (),
|
args: tuple = (),
|
||||||
kwargs: dict[str, Any] | None = None,
|
kwargs: dict[str, Any] | None = None,
|
||||||
hidden: bool = False,
|
hidden: bool = False,
|
||||||
aliases: list[str] | None = None,
|
aliases: list[str] | None = None,
|
||||||
help_text: str = "",
|
help_text: str = "",
|
||||||
|
help_epilogue: str = "",
|
||||||
style: str = OneColors.WHITE,
|
style: str = OneColors.WHITE,
|
||||||
confirm: bool = False,
|
confirm: bool = False,
|
||||||
confirm_message: str = "Are you sure?",
|
confirm_message: str = "Are you sure?",
|
||||||
@ -593,9 +609,33 @@ class Falyx:
|
|||||||
retry_all: bool = False,
|
retry_all: bool = False,
|
||||||
retry_policy: RetryPolicy | None = None,
|
retry_policy: RetryPolicy | None = None,
|
||||||
requires_input: bool | None = None,
|
requires_input: bool | None = None,
|
||||||
|
arg_parser: CommandArgumentParser | None = None,
|
||||||
|
arguments: list[dict[str, Any]] | None = None,
|
||||||
|
argument_config: Callable[[CommandArgumentParser], None] | None = None,
|
||||||
|
custom_parser: ArgParserProtocol | None = None,
|
||||||
|
custom_help: Callable[[], str | None] | None = None,
|
||||||
|
auto_args: bool = False,
|
||||||
|
arg_metadata: dict[str, str | dict[str, Any]] | None = None,
|
||||||
) -> Command:
|
) -> Command:
|
||||||
"""Adds an command to the menu, preventing duplicates."""
|
"""Adds an command to the menu, preventing duplicates."""
|
||||||
self._validate_command_key(key)
|
self._validate_command_key(key)
|
||||||
|
|
||||||
|
if arg_parser:
|
||||||
|
if not isinstance(arg_parser, CommandArgumentParser):
|
||||||
|
raise NotAFalyxError(
|
||||||
|
"arg_parser must be an instance of CommandArgumentParser."
|
||||||
|
)
|
||||||
|
arg_parser = arg_parser
|
||||||
|
else:
|
||||||
|
arg_parser = CommandArgumentParser(
|
||||||
|
command_key=key,
|
||||||
|
command_description=description,
|
||||||
|
command_style=style,
|
||||||
|
help_text=help_text,
|
||||||
|
help_epilogue=help_epilogue,
|
||||||
|
aliases=aliases,
|
||||||
|
)
|
||||||
|
|
||||||
command = Command(
|
command = Command(
|
||||||
key=key,
|
key=key,
|
||||||
description=description,
|
description=description,
|
||||||
@ -605,6 +645,7 @@ class Falyx:
|
|||||||
hidden=hidden,
|
hidden=hidden,
|
||||||
aliases=aliases if aliases else [],
|
aliases=aliases if aliases else [],
|
||||||
help_text=help_text,
|
help_text=help_text,
|
||||||
|
help_epilogue=help_epilogue,
|
||||||
style=style,
|
style=style,
|
||||||
confirm=confirm,
|
confirm=confirm,
|
||||||
confirm_message=confirm_message,
|
confirm_message=confirm_message,
|
||||||
@ -621,6 +662,13 @@ class Falyx:
|
|||||||
retry_policy=retry_policy or RetryPolicy(),
|
retry_policy=retry_policy or RetryPolicy(),
|
||||||
requires_input=requires_input,
|
requires_input=requires_input,
|
||||||
options_manager=self.options,
|
options_manager=self.options,
|
||||||
|
arg_parser=arg_parser,
|
||||||
|
arguments=arguments or [],
|
||||||
|
argument_config=argument_config,
|
||||||
|
custom_parser=custom_parser,
|
||||||
|
custom_help=custom_help,
|
||||||
|
auto_args=auto_args,
|
||||||
|
arg_metadata=arg_metadata or {},
|
||||||
)
|
)
|
||||||
|
|
||||||
if hooks:
|
if hooks:
|
||||||
@ -694,32 +742,57 @@ class Falyx:
|
|||||||
return False, input_str.strip()
|
return False, input_str.strip()
|
||||||
|
|
||||||
def get_command(
|
def get_command(
|
||||||
self, choice: str, from_validate=False
|
self, raw_choices: str, from_validate=False
|
||||||
) -> tuple[bool, Command | None]:
|
) -> tuple[bool, Command | None, tuple, dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
Returns the selected command based on user input.
|
Returns the selected command based on user input.
|
||||||
Supports keys, aliases, and abbreviations.
|
Supports keys, aliases, and abbreviations.
|
||||||
"""
|
"""
|
||||||
|
args = ()
|
||||||
|
kwargs: dict[str, Any] = {}
|
||||||
|
try:
|
||||||
|
choice, *input_args = shlex.split(raw_choices)
|
||||||
|
except ValueError:
|
||||||
|
return False, None, args, kwargs
|
||||||
is_preview, choice = self.parse_preview_command(choice)
|
is_preview, choice = self.parse_preview_command(choice)
|
||||||
if is_preview and not choice and self.help_command:
|
if is_preview and not choice and self.help_command:
|
||||||
is_preview = False
|
is_preview = False
|
||||||
choice = "?"
|
choice = "?"
|
||||||
elif is_preview and not choice:
|
elif is_preview and not choice:
|
||||||
|
# No help command enabled
|
||||||
if not from_validate:
|
if not from_validate:
|
||||||
self.console.print(
|
self.console.print(
|
||||||
f"[{OneColors.DARK_RED}]❌ You must enter a command for preview mode."
|
f"[{OneColors.DARK_RED}]❌ You must enter a command for preview mode."
|
||||||
)
|
)
|
||||||
return is_preview, None
|
return is_preview, None, args, kwargs
|
||||||
|
|
||||||
choice = choice.upper()
|
choice = choice.upper()
|
||||||
name_map = self._name_map
|
name_map = self._name_map
|
||||||
|
|
||||||
if choice in name_map:
|
if choice in name_map:
|
||||||
return is_preview, name_map[choice]
|
if not from_validate:
|
||||||
|
logger.info("Command '%s' selected.", choice)
|
||||||
|
if input_args and name_map[choice].arg_parser:
|
||||||
|
try:
|
||||||
|
args, kwargs = name_map[choice].parse_args(input_args, from_validate)
|
||||||
|
except CommandArgumentError as error:
|
||||||
|
if not from_validate:
|
||||||
|
if not name_map[choice].show_help():
|
||||||
|
self.console.print(
|
||||||
|
f"[{OneColors.DARK_RED}]❌ Invalid arguments for '{choice}': {error}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
name_map[choice].show_help()
|
||||||
|
raise ValidationError(
|
||||||
|
message=str(error), cursor_position=len(raw_choices)
|
||||||
|
)
|
||||||
|
return is_preview, None, args, kwargs
|
||||||
|
except HelpSignal:
|
||||||
|
return True, None, args, kwargs
|
||||||
|
return is_preview, name_map[choice], args, kwargs
|
||||||
|
|
||||||
prefix_matches = [cmd for key, cmd in name_map.items() if key.startswith(choice)]
|
prefix_matches = [cmd for key, cmd in name_map.items() if key.startswith(choice)]
|
||||||
if len(prefix_matches) == 1:
|
if len(prefix_matches) == 1:
|
||||||
return is_preview, prefix_matches[0]
|
return is_preview, prefix_matches[0], args, kwargs
|
||||||
|
|
||||||
fuzzy_matches = get_close_matches(choice, list(name_map.keys()), n=3, cutoff=0.7)
|
fuzzy_matches = get_close_matches(choice, list(name_map.keys()), n=3, cutoff=0.7)
|
||||||
if fuzzy_matches:
|
if fuzzy_matches:
|
||||||
@ -736,7 +809,7 @@ class Falyx:
|
|||||||
self.console.print(
|
self.console.print(
|
||||||
f"[{OneColors.LIGHT_YELLOW}]⚠️ Unknown command '{choice}'[/]"
|
f"[{OneColors.LIGHT_YELLOW}]⚠️ Unknown command '{choice}'[/]"
|
||||||
)
|
)
|
||||||
return is_preview, None
|
return is_preview, None, args, kwargs
|
||||||
|
|
||||||
def _create_context(self, selected_command: Command) -> ExecutionContext:
|
def _create_context(self, selected_command: Command) -> ExecutionContext:
|
||||||
"""Creates a context dictionary for the selected command."""
|
"""Creates a context dictionary for the selected command."""
|
||||||
@ -759,8 +832,9 @@ class Falyx:
|
|||||||
|
|
||||||
async def process_command(self) -> bool:
|
async def process_command(self) -> bool:
|
||||||
"""Processes the action of the selected command."""
|
"""Processes the action of the selected command."""
|
||||||
choice = await self.prompt_session.prompt_async()
|
with patch_stdout(raw=True):
|
||||||
is_preview, selected_command = self.get_command(choice)
|
choice = await self.prompt_session.prompt_async()
|
||||||
|
is_preview, selected_command, args, kwargs = self.get_command(choice)
|
||||||
if not selected_command:
|
if not selected_command:
|
||||||
logger.info("Invalid command '%s'.", choice)
|
logger.info("Invalid command '%s'.", choice)
|
||||||
return True
|
return True
|
||||||
@ -789,8 +863,7 @@ class Falyx:
|
|||||||
context.start_timer()
|
context.start_timer()
|
||||||
try:
|
try:
|
||||||
await self.hooks.trigger(HookType.BEFORE, context)
|
await self.hooks.trigger(HookType.BEFORE, context)
|
||||||
|
result = await selected_command(*args, **kwargs)
|
||||||
result = await selected_command()
|
|
||||||
context.result = result
|
context.result = result
|
||||||
await self.hooks.trigger(HookType.ON_SUCCESS, context)
|
await self.hooks.trigger(HookType.ON_SUCCESS, context)
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
@ -803,10 +876,18 @@ class Falyx:
|
|||||||
await self.hooks.trigger(HookType.ON_TEARDOWN, context)
|
await self.hooks.trigger(HookType.ON_TEARDOWN, context)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def run_key(self, command_key: str, return_context: bool = False) -> Any:
|
async def run_key(
|
||||||
|
self,
|
||||||
|
command_key: str,
|
||||||
|
return_context: bool = False,
|
||||||
|
args: tuple = (),
|
||||||
|
kwargs: dict[str, Any] | None = None,
|
||||||
|
) -> Any:
|
||||||
"""Run a command by key without displaying the menu (non-interactive mode)."""
|
"""Run a command by key without displaying the menu (non-interactive mode)."""
|
||||||
self.debug_hooks()
|
self.debug_hooks()
|
||||||
is_preview, selected_command = self.get_command(command_key)
|
is_preview, selected_command, _, __ = self.get_command(command_key)
|
||||||
|
kwargs = kwargs or {}
|
||||||
|
|
||||||
self.last_run_command = selected_command
|
self.last_run_command = selected_command
|
||||||
|
|
||||||
if not selected_command:
|
if not selected_command:
|
||||||
@ -827,7 +908,7 @@ class Falyx:
|
|||||||
context.start_timer()
|
context.start_timer()
|
||||||
try:
|
try:
|
||||||
await self.hooks.trigger(HookType.BEFORE, context)
|
await self.hooks.trigger(HookType.BEFORE, context)
|
||||||
result = await selected_command()
|
result = await selected_command(*args, **kwargs)
|
||||||
context.result = result
|
context.result = result
|
||||||
|
|
||||||
await self.hooks.trigger(HookType.ON_SUCCESS, context)
|
await self.hooks.trigger(HookType.ON_SUCCESS, context)
|
||||||
@ -951,12 +1032,12 @@ class Falyx:
|
|||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
if self.cli_args.command == "version" or self.cli_args.version:
|
if self.cli_args.command == "version" or self.cli_args.version:
|
||||||
self.console.print(f"[{OneColors.GREEN_b}]Falyx CLI v{__version__}[/]")
|
self.console.print(f"[{OneColors.BLUE_b}]Falyx CLI v{__version__}[/]")
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
if self.cli_args.command == "preview":
|
if self.cli_args.command == "preview":
|
||||||
self.mode = FalyxMode.PREVIEW
|
self.mode = FalyxMode.PREVIEW
|
||||||
_, command = self.get_command(self.cli_args.name)
|
_, command, args, kwargs = self.get_command(self.cli_args.name)
|
||||||
if not command:
|
if not command:
|
||||||
self.console.print(
|
self.console.print(
|
||||||
f"[{OneColors.DARK_RED}]❌ Command '{self.cli_args.name}' not found."
|
f"[{OneColors.DARK_RED}]❌ Command '{self.cli_args.name}' not found."
|
||||||
@ -970,7 +1051,7 @@ class Falyx:
|
|||||||
|
|
||||||
if self.cli_args.command == "run":
|
if self.cli_args.command == "run":
|
||||||
self.mode = FalyxMode.RUN
|
self.mode = FalyxMode.RUN
|
||||||
is_preview, command = self.get_command(self.cli_args.name)
|
is_preview, command, _, __ = self.get_command(self.cli_args.name)
|
||||||
if is_preview:
|
if is_preview:
|
||||||
if command is None:
|
if command is None:
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
@ -981,7 +1062,11 @@ class Falyx:
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
self._set_retry_policy(command)
|
self._set_retry_policy(command)
|
||||||
try:
|
try:
|
||||||
await self.run_key(self.cli_args.name)
|
args, kwargs = command.parse_args(self.cli_args.command_args)
|
||||||
|
except HelpSignal:
|
||||||
|
sys.exit(0)
|
||||||
|
try:
|
||||||
|
await self.run_key(self.cli_args.name, args=args, kwargs=kwargs)
|
||||||
except FalyxError as error:
|
except FalyxError as error:
|
||||||
self.console.print(f"[{OneColors.DARK_RED}]❌ Error: {error}[/]")
|
self.console.print(f"[{OneColors.DARK_RED}]❌ Error: {error}[/]")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
0
falyx/parsers/.pytyped
Normal file
0
falyx/parsers/.pytyped
Normal file
21
falyx/parsers/__init__.py
Normal file
21
falyx/parsers/__init__.py
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
"""
|
||||||
|
Falyx CLI Framework
|
||||||
|
|
||||||
|
Copyright (c) 2025 rtj.dev LLC.
|
||||||
|
Licensed under the MIT License. See LICENSE file for details.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from .argparse import Argument, ArgumentAction, CommandArgumentParser
|
||||||
|
from .parsers import FalyxParsers, get_arg_parsers
|
||||||
|
from .signature import infer_args_from_func
|
||||||
|
from .utils import same_argument_definitions
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"Argument",
|
||||||
|
"ArgumentAction",
|
||||||
|
"CommandArgumentParser",
|
||||||
|
"get_arg_parsers",
|
||||||
|
"FalyxParsers",
|
||||||
|
"infer_args_from_func",
|
||||||
|
"same_argument_definitions",
|
||||||
|
]
|
756
falyx/parsers/argparse.py
Normal file
756
falyx/parsers/argparse.py
Normal file
@ -0,0 +1,756 @@
|
|||||||
|
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
|
||||||
|
from copy import deepcopy
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import Enum
|
||||||
|
from typing import Any, Iterable
|
||||||
|
|
||||||
|
from rich.console import Console
|
||||||
|
from rich.markup import escape
|
||||||
|
from rich.text import Text
|
||||||
|
|
||||||
|
from falyx.exceptions import CommandArgumentError
|
||||||
|
from falyx.signals import HelpSignal
|
||||||
|
|
||||||
|
|
||||||
|
class ArgumentAction(Enum):
|
||||||
|
"""Defines the action to be taken when the argument is encountered."""
|
||||||
|
|
||||||
|
STORE = "store"
|
||||||
|
STORE_TRUE = "store_true"
|
||||||
|
STORE_FALSE = "store_false"
|
||||||
|
APPEND = "append"
|
||||||
|
EXTEND = "extend"
|
||||||
|
COUNT = "count"
|
||||||
|
HELP = "help"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Argument:
|
||||||
|
"""Represents a command-line argument."""
|
||||||
|
|
||||||
|
flags: list[str]
|
||||||
|
dest: str # Destination name for the argument
|
||||||
|
action: ArgumentAction = (
|
||||||
|
ArgumentAction.STORE
|
||||||
|
) # Action to be taken when the argument is encountered
|
||||||
|
type: Any = str # Type of the argument (e.g., str, int, float) or callable
|
||||||
|
default: Any = None # Default value if the argument is not provided
|
||||||
|
choices: list[str] | None = None # List of valid choices for the argument
|
||||||
|
required: bool = False # True if the argument is required
|
||||||
|
help: str = "" # Help text for the argument
|
||||||
|
nargs: int | str = 1 # int, '?', '*', '+'
|
||||||
|
positional: bool = False # True if no leading - or -- in flags
|
||||||
|
|
||||||
|
def get_positional_text(self) -> str:
|
||||||
|
"""Get the positional text for the argument."""
|
||||||
|
text = ""
|
||||||
|
if self.positional:
|
||||||
|
if self.choices:
|
||||||
|
text = f"{{{','.join([str(choice) for choice in self.choices])}}}"
|
||||||
|
else:
|
||||||
|
text = self.dest
|
||||||
|
return text
|
||||||
|
|
||||||
|
def get_choice_text(self) -> str:
|
||||||
|
"""Get the choice text for the argument."""
|
||||||
|
choice_text = ""
|
||||||
|
if self.choices:
|
||||||
|
choice_text = f"{{{','.join([str(choice) for choice in self.choices])}}}"
|
||||||
|
elif (
|
||||||
|
self.action
|
||||||
|
in (
|
||||||
|
ArgumentAction.STORE,
|
||||||
|
ArgumentAction.APPEND,
|
||||||
|
ArgumentAction.EXTEND,
|
||||||
|
)
|
||||||
|
and not self.positional
|
||||||
|
):
|
||||||
|
choice_text = self.dest.upper()
|
||||||
|
elif isinstance(self.nargs, str):
|
||||||
|
choice_text = self.dest
|
||||||
|
|
||||||
|
if self.nargs == "?":
|
||||||
|
choice_text = f"[{choice_text}]"
|
||||||
|
elif self.nargs == "*":
|
||||||
|
choice_text = f"[{choice_text} ...]"
|
||||||
|
elif self.nargs == "+":
|
||||||
|
choice_text = f"{choice_text} [{choice_text} ...]"
|
||||||
|
return choice_text
|
||||||
|
|
||||||
|
def __eq__(self, other: object) -> bool:
|
||||||
|
if not isinstance(other, Argument):
|
||||||
|
return False
|
||||||
|
return (
|
||||||
|
self.flags == other.flags
|
||||||
|
and self.dest == other.dest
|
||||||
|
and self.action == other.action
|
||||||
|
and self.type == other.type
|
||||||
|
and self.choices == other.choices
|
||||||
|
and self.required == other.required
|
||||||
|
and self.nargs == other.nargs
|
||||||
|
and self.positional == other.positional
|
||||||
|
)
|
||||||
|
|
||||||
|
def __hash__(self) -> int:
|
||||||
|
return hash(
|
||||||
|
(
|
||||||
|
tuple(self.flags),
|
||||||
|
self.dest,
|
||||||
|
self.action,
|
||||||
|
self.type,
|
||||||
|
tuple(self.choices or []),
|
||||||
|
self.required,
|
||||||
|
self.nargs,
|
||||||
|
self.positional,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class CommandArgumentParser:
|
||||||
|
"""
|
||||||
|
Custom argument parser for Falyx Commands.
|
||||||
|
It is used to create a command-line interface for Falyx
|
||||||
|
commands, allowing users to specify options and arguments
|
||||||
|
when executing commands.
|
||||||
|
It is not intended to be a full-featured replacement for
|
||||||
|
argparse, but rather a lightweight alternative for specific use
|
||||||
|
cases within the Falyx framework.
|
||||||
|
|
||||||
|
Features:
|
||||||
|
- Customizable argument parsing.
|
||||||
|
- Type coercion for arguments.
|
||||||
|
- Support for positional and keyword arguments.
|
||||||
|
- Support for default values.
|
||||||
|
- Support for boolean flags.
|
||||||
|
- Exception handling for invalid arguments.
|
||||||
|
- Render Help using Rich library.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
command_key: str = "",
|
||||||
|
command_description: str = "",
|
||||||
|
command_style: str = "bold",
|
||||||
|
help_text: str = "",
|
||||||
|
help_epilogue: str = "",
|
||||||
|
aliases: list[str] | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Initialize the CommandArgumentParser."""
|
||||||
|
self.command_key: str = command_key
|
||||||
|
self.command_description: str = command_description
|
||||||
|
self.command_style: str = command_style
|
||||||
|
self.help_text: str = help_text
|
||||||
|
self.help_epilogue: str = help_epilogue
|
||||||
|
self.aliases: list[str] = aliases or []
|
||||||
|
self._arguments: list[Argument] = []
|
||||||
|
self._positional: list[Argument] = []
|
||||||
|
self._keyword: list[Argument] = []
|
||||||
|
self._flag_map: dict[str, Argument] = {}
|
||||||
|
self._dest_set: set[str] = set()
|
||||||
|
self._add_help()
|
||||||
|
self.console = Console(color_system="auto")
|
||||||
|
|
||||||
|
def _add_help(self):
|
||||||
|
"""Add help argument to the parser."""
|
||||||
|
self.add_argument(
|
||||||
|
"-h",
|
||||||
|
"--help",
|
||||||
|
action=ArgumentAction.HELP,
|
||||||
|
help="Show this help message.",
|
||||||
|
dest="help",
|
||||||
|
)
|
||||||
|
|
||||||
|
def _is_positional(self, flags: tuple[str, ...]) -> bool:
|
||||||
|
"""Check if the flags are positional."""
|
||||||
|
positional = False
|
||||||
|
if any(not flag.startswith("-") for flag in flags):
|
||||||
|
positional = True
|
||||||
|
|
||||||
|
if positional and len(flags) > 1:
|
||||||
|
raise CommandArgumentError("Positional arguments cannot have multiple flags")
|
||||||
|
return positional
|
||||||
|
|
||||||
|
def _get_dest_from_flags(
|
||||||
|
self, flags: tuple[str, ...], dest: str | None
|
||||||
|
) -> str | None:
|
||||||
|
"""Convert flags to a destination name."""
|
||||||
|
if dest:
|
||||||
|
if not dest.replace("_", "").isalnum():
|
||||||
|
raise CommandArgumentError(
|
||||||
|
"dest must be a valid identifier (letters, digits, and underscores only)"
|
||||||
|
)
|
||||||
|
if dest[0].isdigit():
|
||||||
|
raise CommandArgumentError("dest must not start with a digit")
|
||||||
|
return dest
|
||||||
|
dest = None
|
||||||
|
for flag in flags:
|
||||||
|
if flag.startswith("--"):
|
||||||
|
dest = flag.lstrip("-").replace("-", "_").lower()
|
||||||
|
break
|
||||||
|
elif flag.startswith("-"):
|
||||||
|
dest = flag.lstrip("-").replace("-", "_").lower()
|
||||||
|
else:
|
||||||
|
dest = flag.replace("-", "_").lower()
|
||||||
|
assert dest is not None, "dest should not be None"
|
||||||
|
if not dest.replace("_", "").isalnum():
|
||||||
|
raise CommandArgumentError(
|
||||||
|
"dest must be a valid identifier (letters, digits, and underscores only)"
|
||||||
|
)
|
||||||
|
if dest[0].isdigit():
|
||||||
|
raise CommandArgumentError("dest must not start with a digit")
|
||||||
|
return dest
|
||||||
|
|
||||||
|
def _determine_required(
|
||||||
|
self, required: bool, positional: bool, nargs: int | str
|
||||||
|
) -> bool:
|
||||||
|
"""Determine if the argument is required."""
|
||||||
|
if required:
|
||||||
|
return True
|
||||||
|
if positional:
|
||||||
|
if isinstance(nargs, int):
|
||||||
|
return nargs > 0
|
||||||
|
elif isinstance(nargs, str):
|
||||||
|
if nargs in ("+"):
|
||||||
|
return True
|
||||||
|
elif nargs in ("*", "?"):
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
raise CommandArgumentError(f"Invalid nargs value: {nargs}")
|
||||||
|
|
||||||
|
return required
|
||||||
|
|
||||||
|
def _validate_nargs(self, nargs: int | str) -> int | str:
|
||||||
|
allowed_nargs = ("?", "*", "+")
|
||||||
|
if isinstance(nargs, int):
|
||||||
|
if nargs <= 0:
|
||||||
|
raise CommandArgumentError("nargs must be a positive integer")
|
||||||
|
elif isinstance(nargs, str):
|
||||||
|
if nargs not in allowed_nargs:
|
||||||
|
raise CommandArgumentError(f"Invalid nargs value: {nargs}")
|
||||||
|
else:
|
||||||
|
raise CommandArgumentError(f"nargs must be an int or one of {allowed_nargs}")
|
||||||
|
return nargs
|
||||||
|
|
||||||
|
def _normalize_choices(self, choices: Iterable, expected_type: Any) -> list[Any]:
|
||||||
|
if choices is not None:
|
||||||
|
if isinstance(choices, dict):
|
||||||
|
raise CommandArgumentError("choices cannot be a dict")
|
||||||
|
try:
|
||||||
|
choices = list(choices)
|
||||||
|
except TypeError:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
"choices must be iterable (like list, tuple, or set)"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
choices = []
|
||||||
|
for choice in choices:
|
||||||
|
if not isinstance(choice, expected_type):
|
||||||
|
try:
|
||||||
|
expected_type(choice)
|
||||||
|
except Exception:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Invalid choice {choice!r}: not coercible to {expected_type.__name__}"
|
||||||
|
)
|
||||||
|
return choices
|
||||||
|
|
||||||
|
def _validate_default_type(
|
||||||
|
self, default: Any, expected_type: type, dest: str
|
||||||
|
) -> None:
|
||||||
|
"""Validate the default value type."""
|
||||||
|
if default is not None and not isinstance(default, expected_type):
|
||||||
|
try:
|
||||||
|
expected_type(default)
|
||||||
|
except Exception:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Default value {default!r} for '{dest}' cannot be coerced to {expected_type.__name__}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _validate_default_list_type(
|
||||||
|
self, default: list[Any], expected_type: type, dest: str
|
||||||
|
) -> None:
|
||||||
|
if isinstance(default, list):
|
||||||
|
for item in default:
|
||||||
|
if not isinstance(item, expected_type):
|
||||||
|
try:
|
||||||
|
expected_type(item)
|
||||||
|
except Exception:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Default list value {default!r} for '{dest}' cannot be coerced to {expected_type.__name__}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _resolve_default(
|
||||||
|
self, action: ArgumentAction, default: Any, nargs: str | int
|
||||||
|
) -> Any:
|
||||||
|
"""Get the default value for the argument."""
|
||||||
|
if default is None:
|
||||||
|
if action == ArgumentAction.STORE_TRUE:
|
||||||
|
return False
|
||||||
|
elif action == ArgumentAction.STORE_FALSE:
|
||||||
|
return True
|
||||||
|
elif action == ArgumentAction.COUNT:
|
||||||
|
return 0
|
||||||
|
elif action in (ArgumentAction.APPEND, ArgumentAction.EXTEND):
|
||||||
|
return []
|
||||||
|
elif nargs in ("+", "*"):
|
||||||
|
return []
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
return default
|
||||||
|
|
||||||
|
def _validate_flags(self, flags: tuple[str, ...]) -> None:
|
||||||
|
"""Validate the flags provided for the argument."""
|
||||||
|
if not flags:
|
||||||
|
raise CommandArgumentError("No flags provided")
|
||||||
|
for flag in flags:
|
||||||
|
if not isinstance(flag, str):
|
||||||
|
raise CommandArgumentError(f"Flag '{flag}' must be a string")
|
||||||
|
if flag.startswith("--") and len(flag) < 3:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Flag '{flag}' must be at least 3 characters long"
|
||||||
|
)
|
||||||
|
if flag.startswith("-") and not flag.startswith("--") and len(flag) > 2:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Flag '{flag}' must be a single character or start with '--'"
|
||||||
|
)
|
||||||
|
|
||||||
|
def add_argument(self, *flags, **kwargs):
|
||||||
|
"""Add an argument to the parser.
|
||||||
|
Args:
|
||||||
|
name or flags: Either a name or prefixed flags (e.g. 'faylx', '-f', '--falyx').
|
||||||
|
action: The action to be taken when the argument is encountered.
|
||||||
|
nargs: The number of arguments expected.
|
||||||
|
default: The default value if the argument is not provided.
|
||||||
|
type: The type to which the command-line argument should be converted.
|
||||||
|
choices: A container of the allowable values for the argument.
|
||||||
|
required: Whether or not the argument is required.
|
||||||
|
help: A brief description of the argument.
|
||||||
|
dest: The name of the attribute to be added to the object returned by parse_args().
|
||||||
|
"""
|
||||||
|
self._validate_flags(flags)
|
||||||
|
positional = self._is_positional(flags)
|
||||||
|
dest = self._get_dest_from_flags(flags, kwargs.get("dest"))
|
||||||
|
if dest in self._dest_set:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Destination '{dest}' is already defined.\n"
|
||||||
|
"Merging multiple arguments into the same dest (e.g. positional + flagged) "
|
||||||
|
"is not supported. Define a unique 'dest' for each argument."
|
||||||
|
)
|
||||||
|
self._dest_set.add(dest)
|
||||||
|
action = kwargs.get("action", ArgumentAction.STORE)
|
||||||
|
if not isinstance(action, ArgumentAction):
|
||||||
|
try:
|
||||||
|
action = ArgumentAction(action)
|
||||||
|
except ValueError:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Invalid action '{action}' is not a valid ArgumentAction"
|
||||||
|
)
|
||||||
|
flags = list(flags)
|
||||||
|
nargs = self._validate_nargs(kwargs.get("nargs", 1))
|
||||||
|
default = self._resolve_default(action, kwargs.get("default"), nargs)
|
||||||
|
expected_type = kwargs.get("type", str)
|
||||||
|
if (
|
||||||
|
action in (ArgumentAction.STORE, ArgumentAction.APPEND, ArgumentAction.EXTEND)
|
||||||
|
and default is not None
|
||||||
|
):
|
||||||
|
if isinstance(default, list):
|
||||||
|
self._validate_default_list_type(default, expected_type, dest)
|
||||||
|
else:
|
||||||
|
self._validate_default_type(default, expected_type, dest)
|
||||||
|
choices = self._normalize_choices(kwargs.get("choices"), expected_type)
|
||||||
|
if default is not None and choices and default not in choices:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Default value '{default}' not in allowed choices: {choices}"
|
||||||
|
)
|
||||||
|
required = self._determine_required(
|
||||||
|
kwargs.get("required", False), positional, nargs
|
||||||
|
)
|
||||||
|
argument = Argument(
|
||||||
|
flags=flags,
|
||||||
|
dest=dest,
|
||||||
|
action=action,
|
||||||
|
type=expected_type,
|
||||||
|
default=default,
|
||||||
|
choices=choices,
|
||||||
|
required=required,
|
||||||
|
help=kwargs.get("help", ""),
|
||||||
|
nargs=nargs,
|
||||||
|
positional=positional,
|
||||||
|
)
|
||||||
|
for flag in flags:
|
||||||
|
if flag in self._flag_map:
|
||||||
|
existing = self._flag_map[flag]
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Flag '{flag}' is already used by argument '{existing.dest}'"
|
||||||
|
)
|
||||||
|
self._flag_map[flag] = argument
|
||||||
|
self._arguments.append(argument)
|
||||||
|
if positional:
|
||||||
|
self._positional.append(argument)
|
||||||
|
else:
|
||||||
|
self._keyword.append(argument)
|
||||||
|
|
||||||
|
def get_argument(self, dest: str) -> Argument | None:
|
||||||
|
return next((a for a in self._arguments if a.dest == dest), None)
|
||||||
|
|
||||||
|
def to_definition_list(self) -> list[dict[str, Any]]:
|
||||||
|
defs = []
|
||||||
|
for arg in self._arguments:
|
||||||
|
defs.append(
|
||||||
|
{
|
||||||
|
"flags": arg.flags,
|
||||||
|
"dest": arg.dest,
|
||||||
|
"action": arg.action,
|
||||||
|
"type": arg.type,
|
||||||
|
"choices": arg.choices,
|
||||||
|
"required": arg.required,
|
||||||
|
"nargs": arg.nargs,
|
||||||
|
"positional": arg.positional,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
return defs
|
||||||
|
|
||||||
|
def _consume_nargs(
|
||||||
|
self, args: list[str], start: int, spec: Argument
|
||||||
|
) -> tuple[list[str], int]:
|
||||||
|
values = []
|
||||||
|
i = start
|
||||||
|
if isinstance(spec.nargs, int):
|
||||||
|
# assert i + spec.nargs <= len(
|
||||||
|
# args
|
||||||
|
# ), "Not enough arguments provided: shouldn't happen"
|
||||||
|
values = args[i : i + spec.nargs]
|
||||||
|
return values, i + spec.nargs
|
||||||
|
elif spec.nargs == "+":
|
||||||
|
if i >= len(args):
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Expected at least one value for '{spec.dest}'"
|
||||||
|
)
|
||||||
|
while i < len(args) and not args[i].startswith("-"):
|
||||||
|
values.append(args[i])
|
||||||
|
i += 1
|
||||||
|
assert values, "Expected at least one value for '+' nargs: shouldn't happen"
|
||||||
|
return values, i
|
||||||
|
elif spec.nargs == "*":
|
||||||
|
while i < len(args) and not args[i].startswith("-"):
|
||||||
|
values.append(args[i])
|
||||||
|
i += 1
|
||||||
|
return values, i
|
||||||
|
elif spec.nargs == "?":
|
||||||
|
if i < len(args) and not args[i].startswith("-"):
|
||||||
|
return [args[i]], i + 1
|
||||||
|
return [], i
|
||||||
|
else:
|
||||||
|
assert False, "Invalid nargs value: shouldn't happen"
|
||||||
|
|
||||||
|
def _consume_all_positional_args(
|
||||||
|
self,
|
||||||
|
args: list[str],
|
||||||
|
result: dict[str, Any],
|
||||||
|
positional_args: list[Argument],
|
||||||
|
consumed_positional_indicies: set[int],
|
||||||
|
) -> int:
|
||||||
|
remaining_positional_args = [
|
||||||
|
(j, spec)
|
||||||
|
for j, spec in enumerate(positional_args)
|
||||||
|
if j not in consumed_positional_indicies
|
||||||
|
]
|
||||||
|
i = 0
|
||||||
|
|
||||||
|
for j, spec in remaining_positional_args:
|
||||||
|
# estimate how many args the remaining specs might need
|
||||||
|
is_last = j == len(positional_args) - 1
|
||||||
|
remaining = len(args) - i
|
||||||
|
min_required = 0
|
||||||
|
for next_spec in positional_args[j + 1 :]:
|
||||||
|
if isinstance(next_spec.nargs, int):
|
||||||
|
min_required += next_spec.nargs
|
||||||
|
elif next_spec.nargs == "+":
|
||||||
|
min_required += 1
|
||||||
|
elif next_spec.nargs == "?":
|
||||||
|
min_required += 0
|
||||||
|
elif next_spec.nargs == "*":
|
||||||
|
min_required += 0
|
||||||
|
else:
|
||||||
|
assert False, "Invalid nargs value: shouldn't happen"
|
||||||
|
|
||||||
|
slice_args = args[i:] if is_last else args[i : i + (remaining - min_required)]
|
||||||
|
values, new_i = self._consume_nargs(slice_args, 0, spec)
|
||||||
|
i += new_i
|
||||||
|
|
||||||
|
try:
|
||||||
|
typed = [spec.type(v) for v in values]
|
||||||
|
except Exception:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Invalid value for '{spec.dest}': expected {spec.type.__name__}"
|
||||||
|
)
|
||||||
|
|
||||||
|
if spec.action == ArgumentAction.APPEND:
|
||||||
|
assert result.get(spec.dest) is not None, "dest should not be None"
|
||||||
|
if spec.nargs in (None, 1):
|
||||||
|
result[spec.dest].append(typed[0])
|
||||||
|
else:
|
||||||
|
result[spec.dest].append(typed)
|
||||||
|
elif spec.action == ArgumentAction.EXTEND:
|
||||||
|
assert result.get(spec.dest) is not None, "dest should not be None"
|
||||||
|
result[spec.dest].extend(typed)
|
||||||
|
elif spec.nargs in (None, 1, "?"):
|
||||||
|
result[spec.dest] = typed[0] if len(typed) == 1 else typed
|
||||||
|
else:
|
||||||
|
result[spec.dest] = typed
|
||||||
|
|
||||||
|
if spec.nargs not in ("*", "+"):
|
||||||
|
consumed_positional_indicies.add(j)
|
||||||
|
|
||||||
|
if i < len(args):
|
||||||
|
raise CommandArgumentError(f"Unexpected positional argument: {args[i:]}")
|
||||||
|
|
||||||
|
return i
|
||||||
|
|
||||||
|
def parse_args(
|
||||||
|
self, args: list[str] | None = None, from_validate: bool = False
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Parse Falyx Command arguments."""
|
||||||
|
if args is None:
|
||||||
|
args = []
|
||||||
|
|
||||||
|
result = {arg.dest: deepcopy(arg.default) for arg in self._arguments}
|
||||||
|
positional_args = [arg for arg in self._arguments if arg.positional]
|
||||||
|
consumed_positional_indices: set[int] = set()
|
||||||
|
|
||||||
|
consumed_indices: set[int] = set()
|
||||||
|
i = 0
|
||||||
|
while i < len(args):
|
||||||
|
token = args[i]
|
||||||
|
if token in self._flag_map:
|
||||||
|
spec = self._flag_map[token]
|
||||||
|
action = spec.action
|
||||||
|
|
||||||
|
if action == ArgumentAction.HELP:
|
||||||
|
if not from_validate:
|
||||||
|
self.render_help()
|
||||||
|
raise HelpSignal()
|
||||||
|
elif action == ArgumentAction.STORE_TRUE:
|
||||||
|
result[spec.dest] = True
|
||||||
|
consumed_indices.add(i)
|
||||||
|
i += 1
|
||||||
|
elif action == ArgumentAction.STORE_FALSE:
|
||||||
|
result[spec.dest] = False
|
||||||
|
consumed_indices.add(i)
|
||||||
|
i += 1
|
||||||
|
elif action == ArgumentAction.COUNT:
|
||||||
|
result[spec.dest] = result.get(spec.dest, 0) + 1
|
||||||
|
consumed_indices.add(i)
|
||||||
|
i += 1
|
||||||
|
elif action == ArgumentAction.APPEND:
|
||||||
|
assert result.get(spec.dest) is not None, "dest should not be None"
|
||||||
|
values, new_i = self._consume_nargs(args, i + 1, spec)
|
||||||
|
try:
|
||||||
|
typed_values = [spec.type(value) for value in values]
|
||||||
|
except ValueError:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Invalid value for '{spec.dest}': expected {spec.type.__name__}"
|
||||||
|
)
|
||||||
|
if spec.nargs in (None, 1):
|
||||||
|
try:
|
||||||
|
result[spec.dest].append(spec.type(values[0]))
|
||||||
|
except ValueError:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Invalid value for '{spec.dest}': expected {spec.type.__name__}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
result[spec.dest].append(typed_values)
|
||||||
|
consumed_indices.update(range(i, new_i))
|
||||||
|
i = new_i
|
||||||
|
elif action == ArgumentAction.EXTEND:
|
||||||
|
assert result.get(spec.dest) is not None, "dest should not be None"
|
||||||
|
values, new_i = self._consume_nargs(args, i + 1, spec)
|
||||||
|
try:
|
||||||
|
typed_values = [spec.type(value) for value in values]
|
||||||
|
except ValueError:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Invalid value for '{spec.dest}': expected {spec.type.__name__}"
|
||||||
|
)
|
||||||
|
result[spec.dest].extend(typed_values)
|
||||||
|
consumed_indices.update(range(i, new_i))
|
||||||
|
i = new_i
|
||||||
|
else:
|
||||||
|
values, new_i = self._consume_nargs(args, i + 1, spec)
|
||||||
|
try:
|
||||||
|
typed_values = [spec.type(v) for v in values]
|
||||||
|
except ValueError:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Invalid value for '{spec.dest}': expected {spec.type.__name__}"
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
spec.nargs in (None, 1, "?")
|
||||||
|
and spec.action != ArgumentAction.APPEND
|
||||||
|
):
|
||||||
|
result[spec.dest] = (
|
||||||
|
typed_values[0] if len(typed_values) == 1 else typed_values
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
result[spec.dest] = typed_values
|
||||||
|
consumed_indices.update(range(i, new_i))
|
||||||
|
i = new_i
|
||||||
|
else:
|
||||||
|
# Get the next flagged argument index if it exists
|
||||||
|
next_flagged_index = -1
|
||||||
|
for index, arg in enumerate(args[i:], start=i):
|
||||||
|
if arg.startswith("-"):
|
||||||
|
next_flagged_index = index
|
||||||
|
break
|
||||||
|
if next_flagged_index == -1:
|
||||||
|
next_flagged_index = len(args)
|
||||||
|
|
||||||
|
args_consumed = self._consume_all_positional_args(
|
||||||
|
args[i:next_flagged_index],
|
||||||
|
result,
|
||||||
|
positional_args,
|
||||||
|
consumed_positional_indices,
|
||||||
|
)
|
||||||
|
i += args_consumed
|
||||||
|
|
||||||
|
# Required validation
|
||||||
|
for spec in self._arguments:
|
||||||
|
if spec.dest == "help":
|
||||||
|
continue
|
||||||
|
if spec.required and not result.get(spec.dest):
|
||||||
|
raise CommandArgumentError(f"Missing required argument: {spec.dest}")
|
||||||
|
|
||||||
|
if spec.choices and result.get(spec.dest) not in spec.choices:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Invalid value for {spec.dest}: must be one of {spec.choices}"
|
||||||
|
)
|
||||||
|
|
||||||
|
if isinstance(spec.nargs, int) and spec.nargs > 1:
|
||||||
|
if not isinstance(result.get(spec.dest), list):
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Invalid value for {spec.dest}: expected a list"
|
||||||
|
)
|
||||||
|
if spec.action == ArgumentAction.APPEND:
|
||||||
|
if not isinstance(result[spec.dest], list):
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Invalid value for {spec.dest}: expected a list"
|
||||||
|
)
|
||||||
|
for group in result[spec.dest]:
|
||||||
|
if len(group) % spec.nargs != 0:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Invalid number of values for {spec.dest}: expected a multiple of {spec.nargs}"
|
||||||
|
)
|
||||||
|
elif spec.action == ArgumentAction.EXTEND:
|
||||||
|
if not isinstance(result[spec.dest], list):
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Invalid value for {spec.dest}: expected a list"
|
||||||
|
)
|
||||||
|
if len(result[spec.dest]) % spec.nargs != 0:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Invalid number of values for {spec.dest}: expected a multiple of {spec.nargs}"
|
||||||
|
)
|
||||||
|
elif len(result[spec.dest]) != spec.nargs:
|
||||||
|
raise CommandArgumentError(
|
||||||
|
f"Invalid number of values for {spec.dest}: expected {spec.nargs}, got {len(result[spec.dest])}"
|
||||||
|
)
|
||||||
|
|
||||||
|
result.pop("help", None)
|
||||||
|
return result
|
||||||
|
|
||||||
|
def parse_args_split(
|
||||||
|
self, args: list[str], from_validate: bool = False
|
||||||
|
) -> tuple[tuple[Any, ...], dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Returns:
|
||||||
|
tuple[args, kwargs] - Positional arguments in defined order,
|
||||||
|
followed by keyword argument mapping.
|
||||||
|
"""
|
||||||
|
parsed = self.parse_args(args, from_validate)
|
||||||
|
args_list = []
|
||||||
|
kwargs_dict = {}
|
||||||
|
for arg in self._arguments:
|
||||||
|
if arg.dest == "help":
|
||||||
|
continue
|
||||||
|
if arg.positional:
|
||||||
|
args_list.append(parsed[arg.dest])
|
||||||
|
else:
|
||||||
|
kwargs_dict[arg.dest] = parsed[arg.dest]
|
||||||
|
return tuple(args_list), kwargs_dict
|
||||||
|
|
||||||
|
def render_help(self) -> None:
|
||||||
|
# Options
|
||||||
|
# Add all keyword arguments to the options list
|
||||||
|
options_list = []
|
||||||
|
for arg in self._keyword:
|
||||||
|
choice_text = arg.get_choice_text()
|
||||||
|
if choice_text:
|
||||||
|
options_list.extend([f"[{arg.flags[0]} {choice_text}]"])
|
||||||
|
else:
|
||||||
|
options_list.extend([f"[{arg.flags[0]}]"])
|
||||||
|
|
||||||
|
# Add positional arguments to the options list
|
||||||
|
for arg in self._positional:
|
||||||
|
choice_text = arg.get_choice_text()
|
||||||
|
if isinstance(arg.nargs, int):
|
||||||
|
choice_text = " ".join([choice_text] * arg.nargs)
|
||||||
|
options_list.append(escape(choice_text))
|
||||||
|
|
||||||
|
options_text = " ".join(options_list)
|
||||||
|
command_keys = " | ".join(
|
||||||
|
[f"[{self.command_style}]{self.command_key}[/{self.command_style}]"]
|
||||||
|
+ [
|
||||||
|
f"[{self.command_style}]{alias}[/{self.command_style}]"
|
||||||
|
for alias in self.aliases
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
usage = f"usage: {command_keys} {options_text}"
|
||||||
|
self.console.print(f"[bold]{usage}[/bold]\n")
|
||||||
|
|
||||||
|
# Description
|
||||||
|
if self.help_text:
|
||||||
|
self.console.print(self.help_text + "\n")
|
||||||
|
|
||||||
|
# Arguments
|
||||||
|
if self._arguments:
|
||||||
|
if self._positional:
|
||||||
|
self.console.print("[bold]positional:[/bold]")
|
||||||
|
for arg in self._positional:
|
||||||
|
flags = arg.get_positional_text()
|
||||||
|
arg_line = Text(f" {flags:<30} ")
|
||||||
|
help_text = arg.help or ""
|
||||||
|
arg_line.append(help_text)
|
||||||
|
self.console.print(arg_line)
|
||||||
|
self.console.print("[bold]options:[/bold]")
|
||||||
|
for arg in self._keyword:
|
||||||
|
flags = ", ".join(arg.flags)
|
||||||
|
flags_choice = f"{flags} {arg.get_choice_text()}"
|
||||||
|
arg_line = Text(f" {flags_choice:<30} ")
|
||||||
|
help_text = arg.help or ""
|
||||||
|
arg_line.append(help_text)
|
||||||
|
self.console.print(arg_line)
|
||||||
|
|
||||||
|
# Epilogue
|
||||||
|
if self.help_epilogue:
|
||||||
|
self.console.print("\n" + self.help_epilogue, style="dim")
|
||||||
|
|
||||||
|
def __eq__(self, other: object) -> bool:
|
||||||
|
if not isinstance(other, CommandArgumentParser):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def sorted_args(parser):
|
||||||
|
return sorted(parser._arguments, key=lambda a: a.dest)
|
||||||
|
|
||||||
|
return sorted_args(self) == sorted_args(other)
|
||||||
|
|
||||||
|
def __hash__(self) -> int:
|
||||||
|
return hash(tuple(sorted(self._arguments, key=lambda a: a.dest)))
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
positional = sum(arg.positional for arg in self._arguments)
|
||||||
|
required = sum(arg.required for arg in self._arguments)
|
||||||
|
return (
|
||||||
|
f"CommandArgumentParser(args={len(self._arguments)}, "
|
||||||
|
f"flags={len(self._flag_map)}, dests={len(self._dest_set)}, "
|
||||||
|
f"required={required}, positional={positional})"
|
||||||
|
)
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
return str(self)
|
@ -2,7 +2,7 @@
|
|||||||
"""parsers.py
|
"""parsers.py
|
||||||
This module contains the argument parsers used for the Falyx CLI.
|
This module contains the argument parsers used for the Falyx CLI.
|
||||||
"""
|
"""
|
||||||
from argparse import ArgumentParser, Namespace, _SubParsersAction
|
from argparse import REMAINDER, ArgumentParser, Namespace, _SubParsersAction
|
||||||
from dataclasses import asdict, dataclass
|
from dataclasses import asdict, dataclass
|
||||||
from typing import Any, Sequence
|
from typing import Any, Sequence
|
||||||
|
|
||||||
@ -114,6 +114,12 @@ def get_arg_parsers(
|
|||||||
help="Skip confirmation prompts",
|
help="Skip confirmation prompts",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
run_group.add_argument(
|
||||||
|
"command_args",
|
||||||
|
nargs=REMAINDER,
|
||||||
|
help="Arguments to pass to the command (if applicable)",
|
||||||
|
)
|
||||||
|
|
||||||
run_all_parser = subparsers.add_parser(
|
run_all_parser = subparsers.add_parser(
|
||||||
"run-all", help="Run all commands with a given tag"
|
"run-all", help="Run all commands with a given tag"
|
||||||
)
|
)
|
71
falyx/parsers/signature.py
Normal file
71
falyx/parsers/signature.py
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
import inspect
|
||||||
|
from typing import Any, Callable
|
||||||
|
|
||||||
|
from falyx import logger
|
||||||
|
|
||||||
|
|
||||||
|
def infer_args_from_func(
|
||||||
|
func: Callable[[Any], Any],
|
||||||
|
arg_metadata: dict[str, str | dict[str, Any]] | None = None,
|
||||||
|
) -> list[dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Infer argument definitions from a callable's signature.
|
||||||
|
Returns a list of kwargs suitable for CommandArgumentParser.add_argument.
|
||||||
|
"""
|
||||||
|
arg_metadata = arg_metadata or {}
|
||||||
|
signature = inspect.signature(func)
|
||||||
|
arg_defs = []
|
||||||
|
|
||||||
|
for name, param in signature.parameters.items():
|
||||||
|
raw_metadata = arg_metadata.get(name, {})
|
||||||
|
metadata = (
|
||||||
|
{"help": raw_metadata} if isinstance(raw_metadata, str) else raw_metadata
|
||||||
|
)
|
||||||
|
|
||||||
|
if param.kind not in (
|
||||||
|
inspect.Parameter.POSITIONAL_ONLY,
|
||||||
|
inspect.Parameter.POSITIONAL_OR_KEYWORD,
|
||||||
|
inspect.Parameter.KEYWORD_ONLY,
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
|
||||||
|
arg_type = (
|
||||||
|
param.annotation if param.annotation is not inspect.Parameter.empty else str
|
||||||
|
)
|
||||||
|
default = param.default if param.default is not inspect.Parameter.empty else None
|
||||||
|
is_required = param.default is inspect.Parameter.empty
|
||||||
|
if is_required:
|
||||||
|
flags = [f"{name.replace('_', '-')}"]
|
||||||
|
else:
|
||||||
|
flags = [f"--{name.replace('_', '-')}"]
|
||||||
|
action = "store"
|
||||||
|
nargs: int | str = 1
|
||||||
|
|
||||||
|
if arg_type is bool:
|
||||||
|
if param.default is False:
|
||||||
|
action = "store_true"
|
||||||
|
else:
|
||||||
|
action = "store_false"
|
||||||
|
|
||||||
|
if arg_type is list:
|
||||||
|
action = "append"
|
||||||
|
if is_required:
|
||||||
|
nargs = "+"
|
||||||
|
else:
|
||||||
|
nargs = "*"
|
||||||
|
|
||||||
|
arg_defs.append(
|
||||||
|
{
|
||||||
|
"flags": flags,
|
||||||
|
"dest": name,
|
||||||
|
"type": arg_type,
|
||||||
|
"default": default,
|
||||||
|
"required": is_required,
|
||||||
|
"nargs": nargs,
|
||||||
|
"action": action,
|
||||||
|
"help": metadata.get("help", ""),
|
||||||
|
"choices": metadata.get("choices"),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return arg_defs
|
33
falyx/parsers/utils.py
Normal file
33
falyx/parsers/utils.py
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from falyx import logger
|
||||||
|
from falyx.action.action import Action, ChainedAction, ProcessAction
|
||||||
|
from falyx.parsers.signature import infer_args_from_func
|
||||||
|
|
||||||
|
|
||||||
|
def same_argument_definitions(
|
||||||
|
actions: list[Any],
|
||||||
|
arg_metadata: dict[str, str | dict[str, Any]] | None = None,
|
||||||
|
) -> list[dict[str, Any]] | None:
|
||||||
|
arg_sets = []
|
||||||
|
for action in actions:
|
||||||
|
if isinstance(action, (Action, ProcessAction)):
|
||||||
|
arg_defs = infer_args_from_func(action.action, arg_metadata)
|
||||||
|
elif isinstance(action, ChainedAction):
|
||||||
|
if action.actions:
|
||||||
|
action = action.actions[0]
|
||||||
|
if isinstance(action, Action):
|
||||||
|
arg_defs = infer_args_from_func(action.action, arg_metadata)
|
||||||
|
elif callable(action):
|
||||||
|
arg_defs = infer_args_from_func(action, arg_metadata)
|
||||||
|
elif callable(action):
|
||||||
|
arg_defs = infer_args_from_func(action, arg_metadata)
|
||||||
|
else:
|
||||||
|
logger.debug("Auto args unsupported for action: %s", action)
|
||||||
|
return None
|
||||||
|
arg_sets.append(arg_defs)
|
||||||
|
|
||||||
|
first = arg_sets[0]
|
||||||
|
if all(arg_set == first for arg_set in arg_sets[1:]):
|
||||||
|
return first
|
||||||
|
return None
|
@ -2,10 +2,16 @@
|
|||||||
"""protocols.py"""
|
"""protocols.py"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import Any, Awaitable, Protocol
|
from typing import Any, Awaitable, Protocol, runtime_checkable
|
||||||
|
|
||||||
from falyx.action.action import BaseAction
|
from falyx.action.action import BaseAction
|
||||||
|
|
||||||
|
|
||||||
|
@runtime_checkable
|
||||||
class ActionFactoryProtocol(Protocol):
|
class ActionFactoryProtocol(Protocol):
|
||||||
async def __call__(self, *args: Any, **kwargs: Any) -> Awaitable[BaseAction]: ...
|
async def __call__(self, *args: Any, **kwargs: Any) -> Awaitable[BaseAction]: ...
|
||||||
|
|
||||||
|
|
||||||
|
@runtime_checkable
|
||||||
|
class ArgParserProtocol(Protocol):
|
||||||
|
def __call__(self, args: list[str]) -> tuple[tuple, dict]: ...
|
||||||
|
@ -29,3 +29,10 @@ class CancelSignal(FlowSignal):
|
|||||||
|
|
||||||
def __init__(self, message: str = "Cancel signal received."):
|
def __init__(self, message: str = "Cancel signal received."):
|
||||||
super().__init__(message)
|
super().__init__(message)
|
||||||
|
|
||||||
|
|
||||||
|
class HelpSignal(FlowSignal):
|
||||||
|
"""Raised to display help information."""
|
||||||
|
|
||||||
|
def __init__(self, message: str = "Help signal received."):
|
||||||
|
super().__init__(message)
|
||||||
|
@ -1 +1 @@
|
|||||||
__version__ = "0.1.27"
|
__version__ = "0.1.29"
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "falyx"
|
name = "falyx"
|
||||||
version = "0.1.27"
|
version = "0.1.29"
|
||||||
description = "Reliable and introspectable async CLI action framework."
|
description = "Reliable and introspectable async CLI action framework."
|
||||||
authors = ["Roland Thomas Jr <roland@rtj.dev>"]
|
authors = ["Roland Thomas Jr <roland@rtj.dev>"]
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
|
678
tests/test_command_argument_parser.py
Normal file
678
tests/test_command_argument_parser.py
Normal file
@ -0,0 +1,678 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
|
from falyx.exceptions import CommandArgumentError
|
||||||
|
from falyx.parsers import ArgumentAction, CommandArgumentParser
|
||||||
|
from falyx.signals import HelpSignal
|
||||||
|
|
||||||
|
|
||||||
|
def build_parser_and_parse(args, config):
|
||||||
|
cap = CommandArgumentParser()
|
||||||
|
config(cap)
|
||||||
|
return cap.parse_args(args)
|
||||||
|
|
||||||
|
|
||||||
|
def test_none():
|
||||||
|
def config(parser):
|
||||||
|
parser.add_argument("--foo", type=str)
|
||||||
|
|
||||||
|
parsed = build_parser_and_parse(None, config)
|
||||||
|
assert parsed["foo"] is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_append_multiple_flags():
|
||||||
|
def config(parser):
|
||||||
|
parser.add_argument("--tag", action=ArgumentAction.APPEND, type=str)
|
||||||
|
|
||||||
|
parsed = build_parser_and_parse(["--tag", "a", "--tag", "b", "--tag", "c"], config)
|
||||||
|
assert parsed["tag"] == ["a", "b", "c"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_positional_nargs_plus_and_single():
|
||||||
|
def config(parser):
|
||||||
|
parser.add_argument("files", nargs="+", type=str)
|
||||||
|
parser.add_argument("mode", nargs=1)
|
||||||
|
|
||||||
|
parsed = build_parser_and_parse(["a", "b", "c", "prod"], config)
|
||||||
|
assert parsed["files"] == ["a", "b", "c"]
|
||||||
|
assert parsed["mode"] == "prod"
|
||||||
|
|
||||||
|
|
||||||
|
def test_type_validation_failure():
|
||||||
|
def config(parser):
|
||||||
|
parser.add_argument("--count", type=int)
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
build_parser_and_parse(["--count", "abc"], config)
|
||||||
|
|
||||||
|
|
||||||
|
def test_required_field_missing():
|
||||||
|
def config(parser):
|
||||||
|
parser.add_argument("--env", type=str, required=True)
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
build_parser_and_parse([], config)
|
||||||
|
|
||||||
|
|
||||||
|
def test_choices_enforced():
|
||||||
|
def config(parser):
|
||||||
|
parser.add_argument("--mode", choices=["dev", "prod"])
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
build_parser_and_parse(["--mode", "staging"], config)
|
||||||
|
|
||||||
|
|
||||||
|
def test_boolean_flags():
|
||||||
|
def config(parser):
|
||||||
|
parser.add_argument("--debug", action=ArgumentAction.STORE_TRUE)
|
||||||
|
parser.add_argument("--no-debug", action=ArgumentAction.STORE_FALSE)
|
||||||
|
|
||||||
|
parsed = build_parser_and_parse(["--debug", "--no-debug"], config)
|
||||||
|
assert parsed["debug"] is True
|
||||||
|
assert parsed["no_debug"] is False
|
||||||
|
parsed = build_parser_and_parse([], config)
|
||||||
|
print(parsed)
|
||||||
|
assert parsed["debug"] is False
|
||||||
|
assert parsed["no_debug"] is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_count_action():
|
||||||
|
def config(parser):
|
||||||
|
parser.add_argument("-v", action=ArgumentAction.COUNT)
|
||||||
|
|
||||||
|
parsed = build_parser_and_parse(["-v", "-v", "-v"], config)
|
||||||
|
assert parsed["v"] == 3
|
||||||
|
|
||||||
|
|
||||||
|
def test_nargs_star():
|
||||||
|
def config(parser):
|
||||||
|
parser.add_argument("args", nargs="*", type=str)
|
||||||
|
|
||||||
|
parsed = build_parser_and_parse(["one", "two", "three"], config)
|
||||||
|
assert parsed["args"] == ["one", "two", "three"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_flag_and_positional_mix():
|
||||||
|
def config(parser):
|
||||||
|
parser.add_argument("--env", type=str)
|
||||||
|
parser.add_argument("tasks", nargs="+")
|
||||||
|
|
||||||
|
parsed = build_parser_and_parse(["--env", "prod", "build", "test"], config)
|
||||||
|
assert parsed["env"] == "prod"
|
||||||
|
assert parsed["tasks"] == ["build", "test"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_duplicate_dest_fails():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("--foo", dest="shared")
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("bar", dest="shared")
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_positional_flag_conflict():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ✅ Single positional argument should work
|
||||||
|
parser.add_argument("faylx")
|
||||||
|
|
||||||
|
# ❌ Multiple positional flags is invalid
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("falyx", "test")
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_positional_and_flag_conflict():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ❌ Cannot mix positional and optional in one declaration
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("faylx", "--falyx")
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_multiple_optional_flags_same_dest():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ✅ Valid: multiple flags for same dest
|
||||||
|
parser.add_argument("-f", "--falyx")
|
||||||
|
arg = parser._arguments[-1]
|
||||||
|
assert arg.dest == "falyx"
|
||||||
|
assert arg.flags == ["-f", "--falyx"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_flag_dest_conflict():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# First one is fine
|
||||||
|
parser.add_argument("falyx")
|
||||||
|
|
||||||
|
# ❌ Cannot reuse dest name with another flag or positional
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--test", dest="falyx")
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_flag_and_positional_conflict_dest_inference():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ❌ "--falyx" and "falyx" result in dest conflict
|
||||||
|
parser.add_argument("--falyx")
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("falyx")
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_multiple_flags_custom_dest():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ✅ Multiple flags with explicit dest
|
||||||
|
parser.add_argument("-f", "--falyx", "--test", dest="falyx")
|
||||||
|
arg = parser._arguments[-1]
|
||||||
|
assert arg.dest == "falyx"
|
||||||
|
assert arg.flags == ["-f", "--falyx", "--test"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_multiple_flags_dest():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ✅ Multiple flags with implicit dest first non -flag
|
||||||
|
parser.add_argument("-f", "--falyx", "--test")
|
||||||
|
arg = parser._arguments[-1]
|
||||||
|
assert arg.dest == "falyx"
|
||||||
|
assert arg.flags == ["-f", "--falyx", "--test"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_single_flag_dest():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ✅ Single flag with explicit dest
|
||||||
|
parser.add_argument("-f")
|
||||||
|
arg = parser._arguments[-1]
|
||||||
|
assert arg.dest == "f"
|
||||||
|
assert arg.flags == ["-f"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_bad_dest():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ❌ Invalid dest name
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--falyx", dest="1falyx")
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--falyx", dest="falyx%")
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_bad_flag():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ❌ Invalid flag name
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--1falyx")
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--!falyx")
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("_")
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument(None)
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument(0)
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("-")
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--")
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("-asdf")
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_duplicate_flags():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
parser.add_argument("--falyx")
|
||||||
|
|
||||||
|
# ❌ Duplicate flag
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--test", "--falyx")
|
||||||
|
|
||||||
|
# ❌ Duplicate flag
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("falyx")
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_no_flags():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ❌ No flags provided
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument()
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_default_value():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ✅ Default value provided
|
||||||
|
parser.add_argument("--falyx", default="default_value")
|
||||||
|
arg = parser._arguments[-1]
|
||||||
|
assert arg.dest == "falyx"
|
||||||
|
assert arg.flags == ["--falyx"]
|
||||||
|
assert arg.default == "default_value"
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_bad_default():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ❌ Invalid default value
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--falyx", type=int, default="1falyx")
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_bad_default_list():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ❌ Invalid default value
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--falyx", type=int, default=["a", 2, 3])
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_bad_action():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ❌ Invalid action
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--falyx", action="invalid_action")
|
||||||
|
|
||||||
|
# ❌ Invalid action type
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--falyx", action=123)
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_default_not_in_choices():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ❌ Default value not in choices
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--falyx", choices=["a", "b"], default="c")
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_choices():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ✅ Choices provided
|
||||||
|
parser.add_argument("--falyx", choices=["a", "b", "c"])
|
||||||
|
arg = parser._arguments[-1]
|
||||||
|
assert arg.dest == "falyx"
|
||||||
|
assert arg.flags == ["--falyx"]
|
||||||
|
assert arg.choices == ["a", "b", "c"]
|
||||||
|
|
||||||
|
args = parser.parse_args(["--falyx", "a"])
|
||||||
|
assert args["falyx"] == "a"
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.parse_args(["--falyx", "d"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_choices_invalid():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ❌ Invalid choices
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--falyx", choices=["a", "b"], default="c")
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--bad", choices=123)
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--bad3", choices={1: "a", 2: "b"})
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--bad4", choices=["a", "b"], type=int)
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_bad_nargs():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
|
||||||
|
# ❌ Invalid nargs value
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--falyx", nargs="invalid")
|
||||||
|
|
||||||
|
# ❌ Invalid nargs type
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--falyx", nargs=123)
|
||||||
|
|
||||||
|
# ❌ Invalid nargs type
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("--falyx", nargs=None)
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_nargs():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
# ✅ Valid nargs value
|
||||||
|
parser.add_argument("--falyx", nargs=2)
|
||||||
|
arg = parser._arguments[-1]
|
||||||
|
assert arg.dest == "falyx"
|
||||||
|
assert arg.flags == ["--falyx"]
|
||||||
|
assert arg.nargs == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_argument_valid_nargs():
|
||||||
|
# Valid nargs int, +, * and ?
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("--falyx", nargs="+")
|
||||||
|
arg = parser._arguments[-1]
|
||||||
|
assert arg.nargs == "+"
|
||||||
|
|
||||||
|
parser.add_argument("--test", nargs="*")
|
||||||
|
arg = parser._arguments[-1]
|
||||||
|
assert arg.nargs == "*"
|
||||||
|
|
||||||
|
parser.add_argument("--test2", nargs="?")
|
||||||
|
arg = parser._arguments[-1]
|
||||||
|
assert arg.nargs == "?"
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_argument():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("--falyx", type=str, default="default_value")
|
||||||
|
arg = parser.get_argument("falyx")
|
||||||
|
assert arg.dest == "falyx"
|
||||||
|
assert arg.flags == ["--falyx"]
|
||||||
|
assert arg.default == "default_value"
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_args_nargs():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("files", nargs="+", type=str)
|
||||||
|
parser.add_argument("mode", nargs=1)
|
||||||
|
|
||||||
|
args = parser.parse_args(["a", "b", "c"])
|
||||||
|
|
||||||
|
assert args["files"] == ["a", "b"]
|
||||||
|
assert args["mode"] == "c"
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_args_nargs_plus():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("files", nargs="+", type=str)
|
||||||
|
|
||||||
|
args = parser.parse_args(["a", "b", "c"])
|
||||||
|
assert args["files"] == ["a", "b", "c"]
|
||||||
|
|
||||||
|
args = parser.parse_args(["a"])
|
||||||
|
assert args["files"] == ["a"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_args_flagged_nargs_plus():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("--files", nargs="+", type=str)
|
||||||
|
|
||||||
|
args = parser.parse_args(["--files", "a", "b", "c"])
|
||||||
|
assert args["files"] == ["a", "b", "c"]
|
||||||
|
|
||||||
|
args = parser.parse_args(["--files", "a"])
|
||||||
|
print(args)
|
||||||
|
assert args["files"] == ["a"]
|
||||||
|
|
||||||
|
args = parser.parse_args([])
|
||||||
|
assert args["files"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_args_numbered_nargs():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("files", nargs=2, type=str)
|
||||||
|
|
||||||
|
args = parser.parse_args(["a", "b"])
|
||||||
|
assert args["files"] == ["a", "b"]
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
args = parser.parse_args(["a"])
|
||||||
|
print(args)
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_args_nargs_zero():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.add_argument("files", nargs=0, type=str)
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_args_nargs_more_than_expected():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("files", nargs=2, type=str)
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.parse_args(["a", "b", "c", "d"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_args_nargs_one_or_none():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("files", nargs="?", type=str)
|
||||||
|
|
||||||
|
args = parser.parse_args(["a"])
|
||||||
|
assert args["files"] == "a"
|
||||||
|
|
||||||
|
args = parser.parse_args([])
|
||||||
|
assert args["files"] is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_args_nargs_positional():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("files", nargs="*", type=str)
|
||||||
|
|
||||||
|
args = parser.parse_args(["a", "b", "c"])
|
||||||
|
assert args["files"] == ["a", "b", "c"]
|
||||||
|
|
||||||
|
args = parser.parse_args([])
|
||||||
|
assert args["files"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_args_nargs_positional_plus():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("files", nargs="+", type=str)
|
||||||
|
|
||||||
|
args = parser.parse_args(["a", "b", "c"])
|
||||||
|
assert args["files"] == ["a", "b", "c"]
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
args = parser.parse_args([])
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_args_nargs_multiple_positional():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("files", nargs="+", type=str)
|
||||||
|
parser.add_argument("mode", nargs=1)
|
||||||
|
parser.add_argument("action", nargs="?")
|
||||||
|
parser.add_argument("target", nargs="*")
|
||||||
|
parser.add_argument("extra", nargs="+")
|
||||||
|
|
||||||
|
args = parser.parse_args(["a", "b", "c", "d", "e"])
|
||||||
|
assert args["files"] == ["a", "b", "c"]
|
||||||
|
assert args["mode"] == "d"
|
||||||
|
assert args["action"] == []
|
||||||
|
assert args["target"] == []
|
||||||
|
assert args["extra"] == ["e"]
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.parse_args([])
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_args_nargs_invalid_positional_arguments():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("numbers", nargs="*", type=int)
|
||||||
|
parser.add_argument("mode", nargs=1)
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.parse_args(["1", "2", "c", "d"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_args_append():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("--numbers", action=ArgumentAction.APPEND, type=int)
|
||||||
|
|
||||||
|
args = parser.parse_args(["--numbers", "1", "--numbers", "2", "--numbers", "3"])
|
||||||
|
assert args["numbers"] == [1, 2, 3]
|
||||||
|
|
||||||
|
args = parser.parse_args(["--numbers", "1"])
|
||||||
|
assert args["numbers"] == [1]
|
||||||
|
|
||||||
|
args = parser.parse_args([])
|
||||||
|
assert args["numbers"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_args_nargs_append():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("numbers", action=ArgumentAction.APPEND, type=int, nargs="*")
|
||||||
|
parser.add_argument("--mode")
|
||||||
|
|
||||||
|
args = parser.parse_args(["1", "2", "3", "--mode", "numbers", "4", "5"])
|
||||||
|
assert args["numbers"] == [[1, 2, 3], [4, 5]]
|
||||||
|
|
||||||
|
args = parser.parse_args(["1"])
|
||||||
|
assert args["numbers"] == [[1]]
|
||||||
|
|
||||||
|
args = parser.parse_args([])
|
||||||
|
assert args["numbers"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_args_append_flagged_invalid_type():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("--numbers", action=ArgumentAction.APPEND, type=int)
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.parse_args(["--numbers", "a"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_append_groups_nargs():
|
||||||
|
cap = CommandArgumentParser()
|
||||||
|
cap.add_argument("--item", action=ArgumentAction.APPEND, type=str, nargs=2)
|
||||||
|
|
||||||
|
parsed = cap.parse_args(["--item", "a", "b", "--item", "c", "d"])
|
||||||
|
assert parsed["item"] == [["a", "b"], ["c", "d"]]
|
||||||
|
|
||||||
|
|
||||||
|
def test_extend_flattened():
|
||||||
|
cap = CommandArgumentParser()
|
||||||
|
cap.add_argument("--value", action=ArgumentAction.EXTEND, type=str)
|
||||||
|
|
||||||
|
parsed = cap.parse_args(["--value", "x", "--value", "y"])
|
||||||
|
assert parsed["value"] == ["x", "y"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_args_split_order():
|
||||||
|
cap = CommandArgumentParser()
|
||||||
|
cap.add_argument("a")
|
||||||
|
cap.add_argument("--x")
|
||||||
|
cap.add_argument("b", nargs="*")
|
||||||
|
args, kwargs = cap.parse_args_split(["1", "--x", "100", "2"])
|
||||||
|
assert args == ("1", ["2"])
|
||||||
|
assert kwargs == {"x": "100"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_help_signal_triggers():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("--foo")
|
||||||
|
with pytest.raises(HelpSignal):
|
||||||
|
parser.parse_args(["--help"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_empty_parser_defaults():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
with pytest.raises(HelpSignal):
|
||||||
|
parser.parse_args(["--help"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_extend_basic():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("--tag", action=ArgumentAction.EXTEND, type=str)
|
||||||
|
|
||||||
|
args = parser.parse_args(["--tag", "a", "--tag", "b", "--tag", "c"])
|
||||||
|
assert args["tag"] == ["a", "b", "c"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_extend_nargs_2():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("--pair", action=ArgumentAction.EXTEND, type=str, nargs=2)
|
||||||
|
|
||||||
|
args = parser.parse_args(["--pair", "a", "b", "--pair", "c", "d"])
|
||||||
|
assert args["pair"] == ["a", "b", "c", "d"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_extend_nargs_star():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("--files", action=ArgumentAction.EXTEND, type=str, nargs="*")
|
||||||
|
|
||||||
|
args = parser.parse_args(["--files", "x", "y", "z"])
|
||||||
|
assert args["files"] == ["x", "y", "z"]
|
||||||
|
|
||||||
|
args = parser.parse_args(["--files"])
|
||||||
|
assert args["files"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_extend_nargs_plus():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("--inputs", action=ArgumentAction.EXTEND, type=int, nargs="+")
|
||||||
|
|
||||||
|
args = parser.parse_args(["--inputs", "1", "2", "3", "--inputs", "4"])
|
||||||
|
assert args["inputs"] == [1, 2, 3, 4]
|
||||||
|
|
||||||
|
|
||||||
|
def test_extend_invalid_type():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("--nums", action=ArgumentAction.EXTEND, type=int)
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.parse_args(["--nums", "a"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_greedy_invalid_type():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("--nums", nargs="*", type=int)
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.parse_args(["--nums", "a"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_append_vs_extend_behavior():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("--x", action=ArgumentAction.APPEND, nargs=2)
|
||||||
|
parser.add_argument("--y", action=ArgumentAction.EXTEND, nargs=2)
|
||||||
|
|
||||||
|
args = parser.parse_args(
|
||||||
|
["--x", "a", "b", "--x", "c", "d", "--y", "1", "2", "--y", "3", "4"]
|
||||||
|
)
|
||||||
|
assert args["x"] == [["a", "b"], ["c", "d"]]
|
||||||
|
assert args["y"] == ["1", "2", "3", "4"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_append_vs_extend_behavior_error():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("--x", action=ArgumentAction.APPEND, nargs=2)
|
||||||
|
parser.add_argument("--y", action=ArgumentAction.EXTEND, nargs=2)
|
||||||
|
|
||||||
|
# This should raise an error because the last argument is not a valid pair
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.parse_args(["--x", "a", "b", "--x", "c", "d", "--y", "1", "2", "--y", "3"])
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.parse_args(["--x", "a", "b", "--x", "c", "--y", "1", "--y", "3", "4"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_extend_positional():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("files", action=ArgumentAction.EXTEND, type=str, nargs="*")
|
||||||
|
|
||||||
|
args = parser.parse_args(["a", "b", "c"])
|
||||||
|
assert args["files"] == ["a", "b", "c"]
|
||||||
|
|
||||||
|
args = parser.parse_args([])
|
||||||
|
assert args["files"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_extend_positional_nargs():
|
||||||
|
parser = CommandArgumentParser()
|
||||||
|
parser.add_argument("files", action=ArgumentAction.EXTEND, type=str, nargs="+")
|
||||||
|
|
||||||
|
args = parser.parse_args(["a", "b", "c"])
|
||||||
|
assert args["files"] == ["a", "b", "c"]
|
||||||
|
|
||||||
|
with pytest.raises(CommandArgumentError):
|
||||||
|
parser.parse_args([])
|
Reference in New Issue
Block a user