Compare commits

3 Commits

9 changed files with 223 additions and 49 deletions

View File

@ -21,11 +21,13 @@ async def test_args(
service: str,
place: Place = Place.NEW_YORK,
region: str = "us-east-1",
tag: str | None = None,
verbose: bool | None = None,
number: int | None = None,
) -> str:
if verbose:
print(f"Deploying {service} to {region} at {place}...")
return f"{service} deployed to {region} at {place}"
print(f"Deploying {service}:{tag}:{number} to {region} at {place}...")
return f"{service}:{tag}:{number} deployed to {region} at {place}"
def default_config(parser: CommandArgumentParser) -> None:
@ -55,6 +57,17 @@ def default_config(parser: CommandArgumentParser) -> None:
action="store_bool_optional",
help="Enable verbose output.",
)
parser.add_argument(
"--tag",
type=str,
help="Optional tag for the deployment.",
suggestions=["latest", "stable", "beta"],
)
parser.add_argument(
"--number",
type=int,
help="Optional number argument.",
)
flx = Falyx("Argument Examples")

View File

@ -29,6 +29,26 @@ class FalyxCompleter(Completer):
yield from self._suggest_commands(tokens[0] if tokens else "")
return
# Identify command
command_key = tokens[0].upper()
command = self.falyx._name_map.get(command_key)
if not command or not command.arg_parser:
return
# If at end of token, e.g., "--t" vs "--tag ", add a stub so suggest_next sees it
parsed_args = tokens[1:] if cursor_at_end_of_token else tokens[1:-1]
stub = "" if cursor_at_end_of_token else tokens[-1]
try:
suggestions = command.arg_parser.suggest_next(
parsed_args + ([stub] if stub else [])
)
for suggestion in suggestions:
if suggestion.startswith(stub):
yield Completion(suggestion, start_position=-len(stub))
except Exception:
return
def _suggest_commands(self, prefix: str) -> Iterable[Completion]:
prefix = prefix.upper()
keys = [self.falyx.exit_command.key]

View File

@ -507,7 +507,6 @@ class Falyx:
message=self.prompt,
multiline=False,
completer=self._get_completer(),
reserve_space_for_menu=1,
validator=CommandValidator(self, self._get_validator_error_message()),
bottom_toolbar=self._get_bottom_bar_render(),
key_bindings=self.key_bindings,

View File

@ -26,6 +26,7 @@ class Argument:
resolver (BaseAction | None):
An action object that resolves the argument, if applicable.
lazy_resolver (bool): True if the resolver should be called lazily, False otherwise
suggestions (list[str] | None): A list of suggestions for the argument.
"""
flags: tuple[str, ...]
@ -40,6 +41,7 @@ class Argument:
positional: bool = False
resolver: BaseAction | None = None
lazy_resolver: bool = False
suggestions: list[str] | None = None
def get_positional_text(self) -> str:
"""Get the positional text for the argument."""

View File

@ -4,7 +4,8 @@ from __future__ import annotations
from collections import defaultdict
from copy import deepcopy
from typing import Any, Iterable
from dataclasses import dataclass
from typing import Any, Iterable, Sequence
from rich.console import Console
from rich.markup import escape
@ -19,6 +20,12 @@ from falyx.parser.utils import coerce_value
from falyx.signals import HelpSignal
@dataclass
class ArgumentState:
arg: Argument
consumed: bool = False
class CommandArgumentParser:
"""
Custom argument parser for Falyx Commands.
@ -64,6 +71,8 @@ class CommandArgumentParser:
self._flag_map: dict[str, Argument] = {}
self._dest_set: set[str] = set()
self._add_help()
self._last_positional_states: dict[str, ArgumentState] = {}
self._last_keyword_states: dict[str, ArgumentState] = {}
def _add_help(self):
"""Add help argument to the parser."""
@ -359,19 +368,19 @@ class CommandArgumentParser:
)
self._register_argument(argument)
self._register_argument(negated_argument)
self._register_argument(negated_argument, bypass_validation=True)
def _register_argument(self, argument: Argument):
def _register_argument(
self, argument: Argument, bypass_validation: bool = False
) -> None:
for flag in argument.flags:
if (
flag in self._flag_map
and not argument.action == ArgumentAction.STORE_BOOL_OPTIONAL
):
if flag in self._flag_map and not bypass_validation:
existing = self._flag_map[flag]
raise CommandArgumentError(
f"Flag '{flag}' is already used by argument '{existing.dest}'"
)
for flag in argument.flags:
self._flag_map[flag] = argument
if not argument.positional:
@ -396,6 +405,7 @@ class CommandArgumentParser:
dest: str | None = None,
resolver: BaseAction | None = None,
lazy_resolver: bool = True,
suggestions: list[str] | None = None,
) -> None:
"""Add an argument to the parser.
For `ArgumentAction.ACTION`, `nargs` and `type` determine how many and what kind
@ -415,6 +425,8 @@ class CommandArgumentParser:
help: A brief description of the argument.
dest: The name of the attribute to be added to the object returned by parse_args().
resolver: A BaseAction called with optional nargs specified parsed arguments.
lazy_resolver: If True, the resolver is called lazily when the argument is accessed.
suggestions: A list of suggestions for the argument.
"""
expected_type = type
self._validate_flags(flags)
@ -445,6 +457,10 @@ class CommandArgumentParser:
f"Default value '{default}' not in allowed choices: {choices}"
)
required = self._determine_required(required, positional, nargs, action)
if not isinstance(suggestions, Sequence) and suggestions is not None:
raise CommandArgumentError(
f"suggestions must be a list or None, got {type(suggestions)}"
)
if not isinstance(lazy_resolver, bool):
raise CommandArgumentError(
f"lazy_resolver must be a boolean, got {type(lazy_resolver)}"
@ -465,6 +481,7 @@ class CommandArgumentParser:
positional=positional,
resolver=resolver,
lazy_resolver=lazy_resolver,
suggestions=suggestions,
)
self._register_argument(argument)
@ -490,6 +507,27 @@ class CommandArgumentParser:
)
return defs
def raise_remaining_args_error(
self, token: str, arg_states: dict[str, ArgumentState]
) -> None:
consumed_dests = [
state.arg.dest for state in arg_states.values() if state.consumed
]
remaining_flags = [
flag
for flag, arg in self._keyword.items()
if arg.dest not in consumed_dests and flag.startswith(token)
]
if remaining_flags:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Did you mean one of: {', '.join(remaining_flags)}?"
)
else:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Use --help to see available options."
)
def _consume_nargs(
self, args: list[str], start: int, spec: Argument
) -> tuple[list[str], int]:
@ -535,6 +573,7 @@ class CommandArgumentParser:
result: dict[str, Any],
positional_args: list[Argument],
consumed_positional_indicies: set[int],
arg_states: dict[str, ArgumentState],
from_validate: bool = False,
) -> int:
remaining_positional_args = [
@ -580,17 +619,7 @@ class CommandArgumentParser:
except Exception as error:
if len(args[i - new_i :]) == 1 and args[i - new_i].startswith("-"):
token = args[i - new_i]
valid_flags = [
flag for flag in self._flag_map if flag.startswith(token)
]
if valid_flags:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Did you mean one of: {', '.join(valid_flags)}?"
) from error
else:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Use --help to see available options."
) from error
self.raise_remaining_args_error(token, arg_states)
else:
raise CommandArgumentError(
f"Invalid value for '{spec.dest}': {error}"
@ -606,6 +635,7 @@ class CommandArgumentParser:
raise CommandArgumentError(
f"[{spec.dest}] Action failed: {error}"
) from error
arg_states[spec.dest].consumed = True
elif not typed and spec.default:
result[spec.dest] = spec.default
elif spec.action == ArgumentAction.APPEND:
@ -618,8 +648,10 @@ class CommandArgumentParser:
assert result.get(spec.dest) is not None, "dest should not be None"
result[spec.dest].extend(typed)
elif spec.nargs in (None, 1, "?"):
arg_states[spec.dest].consumed = True
result[spec.dest] = typed[0] if len(typed) == 1 else typed
else:
arg_states[spec.dest].consumed = True
result[spec.dest] = typed
if spec.nargs not in ("*", "+"):
@ -628,15 +660,7 @@ class CommandArgumentParser:
if i < len(args):
if len(args[i:]) == 1 and args[i].startswith("-"):
token = args[i]
valid_flags = [flag for flag in self._flag_map if flag.startswith(token)]
if valid_flags:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Did you mean one of: {', '.join(valid_flags)}?"
)
else:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Use --help to see available options."
)
self.raise_remaining_args_error(token, arg_states)
else:
plural = "s" if len(args[i:]) > 1 else ""
raise CommandArgumentError(
@ -670,6 +694,7 @@ class CommandArgumentParser:
positional_args: list[Argument],
consumed_positional_indices: set[int],
consumed_indices: set[int],
arg_states: dict[str, ArgumentState],
from_validate: bool = False,
) -> int:
if token in self._keyword:
@ -679,6 +704,7 @@ class CommandArgumentParser:
if action == ArgumentAction.HELP:
if not from_validate:
self.render_help()
arg_states[spec.dest].consumed = True
raise HelpSignal()
elif action == ArgumentAction.ACTION:
assert isinstance(
@ -691,24 +717,29 @@ class CommandArgumentParser:
raise CommandArgumentError(
f"Invalid value for '{spec.dest}': {error}"
) from error
if not spec.lazy_resolver or not from_validate:
try:
result[spec.dest] = await spec.resolver(*typed_values)
except Exception as error:
raise CommandArgumentError(
f"[{spec.dest}] Action failed: {error}"
) from error
arg_states[spec.dest].consumed = True
consumed_indices.update(range(i, new_i))
i = new_i
elif action == ArgumentAction.STORE_TRUE:
result[spec.dest] = True
arg_states[spec.dest].consumed = True
consumed_indices.add(i)
i += 1
elif action == ArgumentAction.STORE_FALSE:
result[spec.dest] = False
arg_states[spec.dest].consumed = True
consumed_indices.add(i)
i += 1
elif action == ArgumentAction.STORE_BOOL_OPTIONAL:
result[spec.dest] = spec.type(True)
arg_states[spec.dest].consumed = True
consumed_indices.add(i)
i += 1
elif action == ArgumentAction.COUNT:
@ -778,19 +809,11 @@ class CommandArgumentParser:
)
else:
result[spec.dest] = typed_values
arg_states[spec.dest].consumed = True
consumed_indices.update(range(i, new_i))
i = new_i
elif token.startswith("-"):
# Handle unrecognized option
valid_flags = [flag for flag in self._flag_map if flag.startswith(token)]
if valid_flags:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Did you mean one of: {', '.join(valid_flags)}?"
)
else:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Use --help to see available options."
)
self.raise_remaining_args_error(token, arg_states)
else:
# Get the next flagged argument index if it exists
next_flagged_index = -1
@ -805,6 +828,7 @@ class CommandArgumentParser:
result,
positional_args,
consumed_positional_indices,
arg_states=arg_states,
from_validate=from_validate,
)
i += args_consumed
@ -817,6 +841,14 @@ class CommandArgumentParser:
if args is None:
args = []
arg_states = {arg.dest: ArgumentState(arg) for arg in self._arguments}
self._last_positional_states = {
arg.dest: arg_states[arg.dest] for arg in self._positional.values()
}
self._last_keyword_states = {
arg.dest: arg_states[arg.dest] for arg in self._keyword_list
}
result = {arg.dest: deepcopy(arg.default) for arg in self._arguments}
positional_args: list[Argument] = [
arg for arg in self._arguments if arg.positional
@ -838,6 +870,7 @@ class CommandArgumentParser:
positional_args,
consumed_positional_indices,
consumed_indices,
arg_states=arg_states,
from_validate=from_validate,
)
@ -862,6 +895,7 @@ class CommandArgumentParser:
)
if spec.choices and result.get(spec.dest) not in spec.choices:
arg_states[spec.dest].consumed = False
raise CommandArgumentError(
f"Invalid value for '{spec.dest}': must be one of {{{', '.join(spec.choices)}}}"
)
@ -914,6 +948,91 @@ class CommandArgumentParser:
kwargs_dict[arg.dest] = parsed[arg.dest]
return tuple(args_list), kwargs_dict
def suggest_next(self, args: list[str]) -> list[str]:
"""
Suggest the next possible flags or values given partially typed arguments.
This does NOT raise errors. It is intended for completions, not validation.
Returns:
A list of possible completions based on the current input.
"""
# Case 1: Next positional argument
next_non_consumed_positional: Argument | None = None
for state in self._last_positional_states.values():
if not state.consumed:
next_non_consumed_positional = state.arg
break
if next_non_consumed_positional:
if next_non_consumed_positional.choices:
return sorted(
(str(choice) for choice in next_non_consumed_positional.choices)
)
if next_non_consumed_positional.suggestions:
return sorted(next_non_consumed_positional.suggestions)
consumed_dests = [
state.arg.dest
for state in self._last_keyword_states.values()
if state.consumed
]
remaining_flags = [
flag for flag, arg in self._keyword.items() if arg.dest not in consumed_dests
]
last = args[-1]
next_to_last = args[-2] if len(args) > 1 else ""
suggestions: list[str] = []
# Case 2: Mid-flag (e.g., "--ver")
if last.startswith("-") and last not in self._keyword:
if (
len(args) > 1
and next_to_last in self._keyword
and next_to_last in remaining_flags
):
# If the last token is a mid-flag, suggest based on the previous flag
arg = self._keyword[next_to_last]
if arg.choices:
suggestions.extend(arg.choices)
elif arg.suggestions:
suggestions.extend(arg.suggestions)
else:
possible_flags = [
flag
for flag, arg in self._keyword.items()
if flag.startswith(last) and arg.dest not in consumed_dests
]
suggestions.extend(possible_flags)
# Case 3: Flag that expects a value (e.g., ["--tag"])
elif last in self._keyword:
arg = self._keyword[last]
if arg.choices:
suggestions.extend(arg.choices)
elif arg.suggestions:
suggestions.extend(arg.suggestions)
# Case 4: Last flag with choices mid-choice (e.g., ["--tag", "v"])
elif next_to_last in self._keyword:
arg = self._keyword[next_to_last]
if arg.choices and last not in arg.choices:
suggestions.extend(arg.choices)
elif (
arg.suggestions
and last not in arg.suggestions
and not any(last.startswith(suggestion) for suggestion in arg.suggestions)
and any(suggestion.startswith(last) for suggestion in arg.suggestions)
):
suggestions.extend(arg.suggestions)
else:
suggestions.extend(remaining_flags)
# Case 5: Suggest all remaining flags
else:
suggestions.extend(remaining_flags)
return sorted(set(suggestions))
def get_options_text(self, plain_text=False) -> str:
# Options
# Add all keyword arguments to the options list

View File

@ -54,8 +54,10 @@ def infer_args_from_func(
if arg_type is bool:
if param.default is False:
action = "store_true"
else:
default = None
elif param.default is True:
action = "store_false"
default = None
if arg_type is list:
action = "append"
@ -75,6 +77,7 @@ def infer_args_from_func(
"action": action,
"help": metadata.get("help", ""),
"choices": metadata.get("choices"),
"suggestions": metadata.get("suggestions"),
}
)

View File

@ -1 +1 @@
__version__ = "0.1.62"
__version__ = "0.1.63"

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "falyx"
version = "0.1.62"
version = "0.1.63"
description = "Reliable and introspectable async CLI action framework."
authors = ["Roland Thomas Jr <roland@rtj.dev>"]
license = "MIT"

View File

@ -0,0 +1,18 @@
import pytest
from falyx.parser.command_argument_parser import CommandArgumentParser
@pytest.mark.asyncio
@pytest.mark.parametrize(
"input_tokens, expected",
[
([""], ["--help", "--tag", "-h"]),
(["--ta"], ["--tag"]),
(["--tag"], ["analytics", "build"]),
],
)
async def test_suggest_next(input_tokens, expected):
parser = CommandArgumentParser(...)
parser.add_argument("--tag", choices=["analytics", "build"])
assert sorted(parser.suggest_next(input_tokens)) == sorted(expected)