Compare commits
3 Commits
dc1764e752
...
fddc3ea8d9
Author | SHA1 | Date | |
---|---|---|---|
fddc3ea8d9 | |||
9b9f6434a4
|
|||
c15e3afa5e
|
@ -21,11 +21,13 @@ async def test_args(
|
||||
service: str,
|
||||
place: Place = Place.NEW_YORK,
|
||||
region: str = "us-east-1",
|
||||
tag: str | None = None,
|
||||
verbose: bool | None = None,
|
||||
number: int | None = None,
|
||||
) -> str:
|
||||
if verbose:
|
||||
print(f"Deploying {service} to {region} at {place}...")
|
||||
return f"{service} deployed to {region} at {place}"
|
||||
print(f"Deploying {service}:{tag}:{number} to {region} at {place}...")
|
||||
return f"{service}:{tag}:{number} deployed to {region} at {place}"
|
||||
|
||||
|
||||
def default_config(parser: CommandArgumentParser) -> None:
|
||||
@ -55,6 +57,17 @@ def default_config(parser: CommandArgumentParser) -> None:
|
||||
action="store_bool_optional",
|
||||
help="Enable verbose output.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--tag",
|
||||
type=str,
|
||||
help="Optional tag for the deployment.",
|
||||
suggestions=["latest", "stable", "beta"],
|
||||
)
|
||||
parser.add_argument(
|
||||
"--number",
|
||||
type=int,
|
||||
help="Optional number argument.",
|
||||
)
|
||||
|
||||
|
||||
flx = Falyx("Argument Examples")
|
||||
|
@ -29,6 +29,26 @@ class FalyxCompleter(Completer):
|
||||
yield from self._suggest_commands(tokens[0] if tokens else "")
|
||||
return
|
||||
|
||||
# Identify command
|
||||
command_key = tokens[0].upper()
|
||||
command = self.falyx._name_map.get(command_key)
|
||||
if not command or not command.arg_parser:
|
||||
return
|
||||
|
||||
# If at end of token, e.g., "--t" vs "--tag ", add a stub so suggest_next sees it
|
||||
parsed_args = tokens[1:] if cursor_at_end_of_token else tokens[1:-1]
|
||||
stub = "" if cursor_at_end_of_token else tokens[-1]
|
||||
|
||||
try:
|
||||
suggestions = command.arg_parser.suggest_next(
|
||||
parsed_args + ([stub] if stub else [])
|
||||
)
|
||||
for suggestion in suggestions:
|
||||
if suggestion.startswith(stub):
|
||||
yield Completion(suggestion, start_position=-len(stub))
|
||||
except Exception:
|
||||
return
|
||||
|
||||
def _suggest_commands(self, prefix: str) -> Iterable[Completion]:
|
||||
prefix = prefix.upper()
|
||||
keys = [self.falyx.exit_command.key]
|
||||
|
@ -507,7 +507,6 @@ class Falyx:
|
||||
message=self.prompt,
|
||||
multiline=False,
|
||||
completer=self._get_completer(),
|
||||
reserve_space_for_menu=1,
|
||||
validator=CommandValidator(self, self._get_validator_error_message()),
|
||||
bottom_toolbar=self._get_bottom_bar_render(),
|
||||
key_bindings=self.key_bindings,
|
||||
|
@ -26,6 +26,7 @@ class Argument:
|
||||
resolver (BaseAction | None):
|
||||
An action object that resolves the argument, if applicable.
|
||||
lazy_resolver (bool): True if the resolver should be called lazily, False otherwise
|
||||
suggestions (list[str] | None): A list of suggestions for the argument.
|
||||
"""
|
||||
|
||||
flags: tuple[str, ...]
|
||||
@ -40,6 +41,7 @@ class Argument:
|
||||
positional: bool = False
|
||||
resolver: BaseAction | None = None
|
||||
lazy_resolver: bool = False
|
||||
suggestions: list[str] | None = None
|
||||
|
||||
def get_positional_text(self) -> str:
|
||||
"""Get the positional text for the argument."""
|
||||
|
@ -4,7 +4,8 @@ from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from copy import deepcopy
|
||||
from typing import Any, Iterable
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Iterable, Sequence
|
||||
|
||||
from rich.console import Console
|
||||
from rich.markup import escape
|
||||
@ -19,6 +20,12 @@ from falyx.parser.utils import coerce_value
|
||||
from falyx.signals import HelpSignal
|
||||
|
||||
|
||||
@dataclass
|
||||
class ArgumentState:
|
||||
arg: Argument
|
||||
consumed: bool = False
|
||||
|
||||
|
||||
class CommandArgumentParser:
|
||||
"""
|
||||
Custom argument parser for Falyx Commands.
|
||||
@ -64,6 +71,8 @@ class CommandArgumentParser:
|
||||
self._flag_map: dict[str, Argument] = {}
|
||||
self._dest_set: set[str] = set()
|
||||
self._add_help()
|
||||
self._last_positional_states: dict[str, ArgumentState] = {}
|
||||
self._last_keyword_states: dict[str, ArgumentState] = {}
|
||||
|
||||
def _add_help(self):
|
||||
"""Add help argument to the parser."""
|
||||
@ -359,19 +368,19 @@ class CommandArgumentParser:
|
||||
)
|
||||
|
||||
self._register_argument(argument)
|
||||
self._register_argument(negated_argument)
|
||||
self._register_argument(negated_argument, bypass_validation=True)
|
||||
|
||||
def _register_argument(self, argument: Argument):
|
||||
def _register_argument(
|
||||
self, argument: Argument, bypass_validation: bool = False
|
||||
) -> None:
|
||||
|
||||
for flag in argument.flags:
|
||||
if (
|
||||
flag in self._flag_map
|
||||
and not argument.action == ArgumentAction.STORE_BOOL_OPTIONAL
|
||||
):
|
||||
if flag in self._flag_map and not bypass_validation:
|
||||
existing = self._flag_map[flag]
|
||||
raise CommandArgumentError(
|
||||
f"Flag '{flag}' is already used by argument '{existing.dest}'"
|
||||
)
|
||||
|
||||
for flag in argument.flags:
|
||||
self._flag_map[flag] = argument
|
||||
if not argument.positional:
|
||||
@ -396,6 +405,7 @@ class CommandArgumentParser:
|
||||
dest: str | None = None,
|
||||
resolver: BaseAction | None = None,
|
||||
lazy_resolver: bool = True,
|
||||
suggestions: list[str] | None = None,
|
||||
) -> None:
|
||||
"""Add an argument to the parser.
|
||||
For `ArgumentAction.ACTION`, `nargs` and `type` determine how many and what kind
|
||||
@ -415,6 +425,8 @@ class CommandArgumentParser:
|
||||
help: A brief description of the argument.
|
||||
dest: The name of the attribute to be added to the object returned by parse_args().
|
||||
resolver: A BaseAction called with optional nargs specified parsed arguments.
|
||||
lazy_resolver: If True, the resolver is called lazily when the argument is accessed.
|
||||
suggestions: A list of suggestions for the argument.
|
||||
"""
|
||||
expected_type = type
|
||||
self._validate_flags(flags)
|
||||
@ -445,6 +457,10 @@ class CommandArgumentParser:
|
||||
f"Default value '{default}' not in allowed choices: {choices}"
|
||||
)
|
||||
required = self._determine_required(required, positional, nargs, action)
|
||||
if not isinstance(suggestions, Sequence) and suggestions is not None:
|
||||
raise CommandArgumentError(
|
||||
f"suggestions must be a list or None, got {type(suggestions)}"
|
||||
)
|
||||
if not isinstance(lazy_resolver, bool):
|
||||
raise CommandArgumentError(
|
||||
f"lazy_resolver must be a boolean, got {type(lazy_resolver)}"
|
||||
@ -465,6 +481,7 @@ class CommandArgumentParser:
|
||||
positional=positional,
|
||||
resolver=resolver,
|
||||
lazy_resolver=lazy_resolver,
|
||||
suggestions=suggestions,
|
||||
)
|
||||
self._register_argument(argument)
|
||||
|
||||
@ -490,6 +507,27 @@ class CommandArgumentParser:
|
||||
)
|
||||
return defs
|
||||
|
||||
def raise_remaining_args_error(
|
||||
self, token: str, arg_states: dict[str, ArgumentState]
|
||||
) -> None:
|
||||
consumed_dests = [
|
||||
state.arg.dest for state in arg_states.values() if state.consumed
|
||||
]
|
||||
remaining_flags = [
|
||||
flag
|
||||
for flag, arg in self._keyword.items()
|
||||
if arg.dest not in consumed_dests and flag.startswith(token)
|
||||
]
|
||||
|
||||
if remaining_flags:
|
||||
raise CommandArgumentError(
|
||||
f"Unrecognized option '{token}'. Did you mean one of: {', '.join(remaining_flags)}?"
|
||||
)
|
||||
else:
|
||||
raise CommandArgumentError(
|
||||
f"Unrecognized option '{token}'. Use --help to see available options."
|
||||
)
|
||||
|
||||
def _consume_nargs(
|
||||
self, args: list[str], start: int, spec: Argument
|
||||
) -> tuple[list[str], int]:
|
||||
@ -535,6 +573,7 @@ class CommandArgumentParser:
|
||||
result: dict[str, Any],
|
||||
positional_args: list[Argument],
|
||||
consumed_positional_indicies: set[int],
|
||||
arg_states: dict[str, ArgumentState],
|
||||
from_validate: bool = False,
|
||||
) -> int:
|
||||
remaining_positional_args = [
|
||||
@ -580,17 +619,7 @@ class CommandArgumentParser:
|
||||
except Exception as error:
|
||||
if len(args[i - new_i :]) == 1 and args[i - new_i].startswith("-"):
|
||||
token = args[i - new_i]
|
||||
valid_flags = [
|
||||
flag for flag in self._flag_map if flag.startswith(token)
|
||||
]
|
||||
if valid_flags:
|
||||
raise CommandArgumentError(
|
||||
f"Unrecognized option '{token}'. Did you mean one of: {', '.join(valid_flags)}?"
|
||||
) from error
|
||||
else:
|
||||
raise CommandArgumentError(
|
||||
f"Unrecognized option '{token}'. Use --help to see available options."
|
||||
) from error
|
||||
self.raise_remaining_args_error(token, arg_states)
|
||||
else:
|
||||
raise CommandArgumentError(
|
||||
f"Invalid value for '{spec.dest}': {error}"
|
||||
@ -606,6 +635,7 @@ class CommandArgumentParser:
|
||||
raise CommandArgumentError(
|
||||
f"[{spec.dest}] Action failed: {error}"
|
||||
) from error
|
||||
arg_states[spec.dest].consumed = True
|
||||
elif not typed and spec.default:
|
||||
result[spec.dest] = spec.default
|
||||
elif spec.action == ArgumentAction.APPEND:
|
||||
@ -618,8 +648,10 @@ class CommandArgumentParser:
|
||||
assert result.get(spec.dest) is not None, "dest should not be None"
|
||||
result[spec.dest].extend(typed)
|
||||
elif spec.nargs in (None, 1, "?"):
|
||||
arg_states[spec.dest].consumed = True
|
||||
result[spec.dest] = typed[0] if len(typed) == 1 else typed
|
||||
else:
|
||||
arg_states[spec.dest].consumed = True
|
||||
result[spec.dest] = typed
|
||||
|
||||
if spec.nargs not in ("*", "+"):
|
||||
@ -628,15 +660,7 @@ class CommandArgumentParser:
|
||||
if i < len(args):
|
||||
if len(args[i:]) == 1 and args[i].startswith("-"):
|
||||
token = args[i]
|
||||
valid_flags = [flag for flag in self._flag_map if flag.startswith(token)]
|
||||
if valid_flags:
|
||||
raise CommandArgumentError(
|
||||
f"Unrecognized option '{token}'. Did you mean one of: {', '.join(valid_flags)}?"
|
||||
)
|
||||
else:
|
||||
raise CommandArgumentError(
|
||||
f"Unrecognized option '{token}'. Use --help to see available options."
|
||||
)
|
||||
self.raise_remaining_args_error(token, arg_states)
|
||||
else:
|
||||
plural = "s" if len(args[i:]) > 1 else ""
|
||||
raise CommandArgumentError(
|
||||
@ -670,6 +694,7 @@ class CommandArgumentParser:
|
||||
positional_args: list[Argument],
|
||||
consumed_positional_indices: set[int],
|
||||
consumed_indices: set[int],
|
||||
arg_states: dict[str, ArgumentState],
|
||||
from_validate: bool = False,
|
||||
) -> int:
|
||||
if token in self._keyword:
|
||||
@ -679,6 +704,7 @@ class CommandArgumentParser:
|
||||
if action == ArgumentAction.HELP:
|
||||
if not from_validate:
|
||||
self.render_help()
|
||||
arg_states[spec.dest].consumed = True
|
||||
raise HelpSignal()
|
||||
elif action == ArgumentAction.ACTION:
|
||||
assert isinstance(
|
||||
@ -691,24 +717,29 @@ class CommandArgumentParser:
|
||||
raise CommandArgumentError(
|
||||
f"Invalid value for '{spec.dest}': {error}"
|
||||
) from error
|
||||
if not spec.lazy_resolver or not from_validate:
|
||||
try:
|
||||
result[spec.dest] = await spec.resolver(*typed_values)
|
||||
except Exception as error:
|
||||
raise CommandArgumentError(
|
||||
f"[{spec.dest}] Action failed: {error}"
|
||||
) from error
|
||||
arg_states[spec.dest].consumed = True
|
||||
consumed_indices.update(range(i, new_i))
|
||||
i = new_i
|
||||
elif action == ArgumentAction.STORE_TRUE:
|
||||
result[spec.dest] = True
|
||||
arg_states[spec.dest].consumed = True
|
||||
consumed_indices.add(i)
|
||||
i += 1
|
||||
elif action == ArgumentAction.STORE_FALSE:
|
||||
result[spec.dest] = False
|
||||
arg_states[spec.dest].consumed = True
|
||||
consumed_indices.add(i)
|
||||
i += 1
|
||||
elif action == ArgumentAction.STORE_BOOL_OPTIONAL:
|
||||
result[spec.dest] = spec.type(True)
|
||||
arg_states[spec.dest].consumed = True
|
||||
consumed_indices.add(i)
|
||||
i += 1
|
||||
elif action == ArgumentAction.COUNT:
|
||||
@ -778,19 +809,11 @@ class CommandArgumentParser:
|
||||
)
|
||||
else:
|
||||
result[spec.dest] = typed_values
|
||||
arg_states[spec.dest].consumed = True
|
||||
consumed_indices.update(range(i, new_i))
|
||||
i = new_i
|
||||
elif token.startswith("-"):
|
||||
# Handle unrecognized option
|
||||
valid_flags = [flag for flag in self._flag_map if flag.startswith(token)]
|
||||
if valid_flags:
|
||||
raise CommandArgumentError(
|
||||
f"Unrecognized option '{token}'. Did you mean one of: {', '.join(valid_flags)}?"
|
||||
)
|
||||
else:
|
||||
raise CommandArgumentError(
|
||||
f"Unrecognized option '{token}'. Use --help to see available options."
|
||||
)
|
||||
self.raise_remaining_args_error(token, arg_states)
|
||||
else:
|
||||
# Get the next flagged argument index if it exists
|
||||
next_flagged_index = -1
|
||||
@ -805,6 +828,7 @@ class CommandArgumentParser:
|
||||
result,
|
||||
positional_args,
|
||||
consumed_positional_indices,
|
||||
arg_states=arg_states,
|
||||
from_validate=from_validate,
|
||||
)
|
||||
i += args_consumed
|
||||
@ -817,6 +841,14 @@ class CommandArgumentParser:
|
||||
if args is None:
|
||||
args = []
|
||||
|
||||
arg_states = {arg.dest: ArgumentState(arg) for arg in self._arguments}
|
||||
self._last_positional_states = {
|
||||
arg.dest: arg_states[arg.dest] for arg in self._positional.values()
|
||||
}
|
||||
self._last_keyword_states = {
|
||||
arg.dest: arg_states[arg.dest] for arg in self._keyword_list
|
||||
}
|
||||
|
||||
result = {arg.dest: deepcopy(arg.default) for arg in self._arguments}
|
||||
positional_args: list[Argument] = [
|
||||
arg for arg in self._arguments if arg.positional
|
||||
@ -838,6 +870,7 @@ class CommandArgumentParser:
|
||||
positional_args,
|
||||
consumed_positional_indices,
|
||||
consumed_indices,
|
||||
arg_states=arg_states,
|
||||
from_validate=from_validate,
|
||||
)
|
||||
|
||||
@ -862,6 +895,7 @@ class CommandArgumentParser:
|
||||
)
|
||||
|
||||
if spec.choices and result.get(spec.dest) not in spec.choices:
|
||||
arg_states[spec.dest].consumed = False
|
||||
raise CommandArgumentError(
|
||||
f"Invalid value for '{spec.dest}': must be one of {{{', '.join(spec.choices)}}}"
|
||||
)
|
||||
@ -914,6 +948,91 @@ class CommandArgumentParser:
|
||||
kwargs_dict[arg.dest] = parsed[arg.dest]
|
||||
return tuple(args_list), kwargs_dict
|
||||
|
||||
def suggest_next(self, args: list[str]) -> list[str]:
|
||||
"""
|
||||
Suggest the next possible flags or values given partially typed arguments.
|
||||
|
||||
This does NOT raise errors. It is intended for completions, not validation.
|
||||
|
||||
Returns:
|
||||
A list of possible completions based on the current input.
|
||||
"""
|
||||
|
||||
# Case 1: Next positional argument
|
||||
next_non_consumed_positional: Argument | None = None
|
||||
for state in self._last_positional_states.values():
|
||||
if not state.consumed:
|
||||
next_non_consumed_positional = state.arg
|
||||
break
|
||||
if next_non_consumed_positional:
|
||||
if next_non_consumed_positional.choices:
|
||||
return sorted(
|
||||
(str(choice) for choice in next_non_consumed_positional.choices)
|
||||
)
|
||||
if next_non_consumed_positional.suggestions:
|
||||
return sorted(next_non_consumed_positional.suggestions)
|
||||
|
||||
consumed_dests = [
|
||||
state.arg.dest
|
||||
for state in self._last_keyword_states.values()
|
||||
if state.consumed
|
||||
]
|
||||
|
||||
remaining_flags = [
|
||||
flag for flag, arg in self._keyword.items() if arg.dest not in consumed_dests
|
||||
]
|
||||
|
||||
last = args[-1]
|
||||
next_to_last = args[-2] if len(args) > 1 else ""
|
||||
suggestions: list[str] = []
|
||||
|
||||
# Case 2: Mid-flag (e.g., "--ver")
|
||||
if last.startswith("-") and last not in self._keyword:
|
||||
if (
|
||||
len(args) > 1
|
||||
and next_to_last in self._keyword
|
||||
and next_to_last in remaining_flags
|
||||
):
|
||||
# If the last token is a mid-flag, suggest based on the previous flag
|
||||
arg = self._keyword[next_to_last]
|
||||
if arg.choices:
|
||||
suggestions.extend(arg.choices)
|
||||
elif arg.suggestions:
|
||||
suggestions.extend(arg.suggestions)
|
||||
else:
|
||||
possible_flags = [
|
||||
flag
|
||||
for flag, arg in self._keyword.items()
|
||||
if flag.startswith(last) and arg.dest not in consumed_dests
|
||||
]
|
||||
suggestions.extend(possible_flags)
|
||||
# Case 3: Flag that expects a value (e.g., ["--tag"])
|
||||
elif last in self._keyword:
|
||||
arg = self._keyword[last]
|
||||
if arg.choices:
|
||||
suggestions.extend(arg.choices)
|
||||
elif arg.suggestions:
|
||||
suggestions.extend(arg.suggestions)
|
||||
# Case 4: Last flag with choices mid-choice (e.g., ["--tag", "v"])
|
||||
elif next_to_last in self._keyword:
|
||||
arg = self._keyword[next_to_last]
|
||||
if arg.choices and last not in arg.choices:
|
||||
suggestions.extend(arg.choices)
|
||||
elif (
|
||||
arg.suggestions
|
||||
and last not in arg.suggestions
|
||||
and not any(last.startswith(suggestion) for suggestion in arg.suggestions)
|
||||
and any(suggestion.startswith(last) for suggestion in arg.suggestions)
|
||||
):
|
||||
suggestions.extend(arg.suggestions)
|
||||
else:
|
||||
suggestions.extend(remaining_flags)
|
||||
# Case 5: Suggest all remaining flags
|
||||
else:
|
||||
suggestions.extend(remaining_flags)
|
||||
|
||||
return sorted(set(suggestions))
|
||||
|
||||
def get_options_text(self, plain_text=False) -> str:
|
||||
# Options
|
||||
# Add all keyword arguments to the options list
|
||||
|
@ -54,8 +54,10 @@ def infer_args_from_func(
|
||||
if arg_type is bool:
|
||||
if param.default is False:
|
||||
action = "store_true"
|
||||
else:
|
||||
default = None
|
||||
elif param.default is True:
|
||||
action = "store_false"
|
||||
default = None
|
||||
|
||||
if arg_type is list:
|
||||
action = "append"
|
||||
@ -75,6 +77,7 @@ def infer_args_from_func(
|
||||
"action": action,
|
||||
"help": metadata.get("help", ""),
|
||||
"choices": metadata.get("choices"),
|
||||
"suggestions": metadata.get("suggestions"),
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -1 +1 @@
|
||||
__version__ = "0.1.62"
|
||||
__version__ = "0.1.63"
|
||||
|
@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "falyx"
|
||||
version = "0.1.62"
|
||||
version = "0.1.63"
|
||||
description = "Reliable and introspectable async CLI action framework."
|
||||
authors = ["Roland Thomas Jr <roland@rtj.dev>"]
|
||||
license = "MIT"
|
||||
|
18
tests/test_parsers/test_completions.py
Normal file
18
tests/test_parsers/test_completions.py
Normal file
@ -0,0 +1,18 @@
|
||||
import pytest
|
||||
|
||||
from falyx.parser.command_argument_parser import CommandArgumentParser
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"input_tokens, expected",
|
||||
[
|
||||
([""], ["--help", "--tag", "-h"]),
|
||||
(["--ta"], ["--tag"]),
|
||||
(["--tag"], ["analytics", "build"]),
|
||||
],
|
||||
)
|
||||
async def test_suggest_next(input_tokens, expected):
|
||||
parser = CommandArgumentParser(...)
|
||||
parser.add_argument("--tag", choices=["analytics", "build"])
|
||||
assert sorted(parser.suggest_next(input_tokens)) == sorted(expected)
|
Reference in New Issue
Block a user