completions #4

Merged
roland merged 2 commits from completions into main 2025-07-17 20:16:20 -04:00
9 changed files with 223 additions and 49 deletions

View File

@ -21,11 +21,13 @@ async def test_args(
service: str, service: str,
place: Place = Place.NEW_YORK, place: Place = Place.NEW_YORK,
region: str = "us-east-1", region: str = "us-east-1",
tag: str | None = None,
verbose: bool | None = None, verbose: bool | None = None,
number: int | None = None,
) -> str: ) -> str:
if verbose: if verbose:
print(f"Deploying {service} to {region} at {place}...") print(f"Deploying {service}:{tag}:{number} to {region} at {place}...")
return f"{service} deployed to {region} at {place}" return f"{service}:{tag}:{number} deployed to {region} at {place}"
def default_config(parser: CommandArgumentParser) -> None: def default_config(parser: CommandArgumentParser) -> None:
@ -55,6 +57,17 @@ def default_config(parser: CommandArgumentParser) -> None:
action="store_bool_optional", action="store_bool_optional",
help="Enable verbose output.", help="Enable verbose output.",
) )
parser.add_argument(
"--tag",
type=str,
help="Optional tag for the deployment.",
suggestions=["latest", "stable", "beta"],
)
parser.add_argument(
"--number",
type=int,
help="Optional number argument.",
)
flx = Falyx("Argument Examples") flx = Falyx("Argument Examples")

View File

@ -29,6 +29,26 @@ class FalyxCompleter(Completer):
yield from self._suggest_commands(tokens[0] if tokens else "") yield from self._suggest_commands(tokens[0] if tokens else "")
return return
# Identify command
command_key = tokens[0].upper()
command = self.falyx._name_map.get(command_key)
if not command or not command.arg_parser:
return
# If at end of token, e.g., "--t" vs "--tag ", add a stub so suggest_next sees it
parsed_args = tokens[1:] if cursor_at_end_of_token else tokens[1:-1]
stub = "" if cursor_at_end_of_token else tokens[-1]
try:
suggestions = command.arg_parser.suggest_next(
parsed_args + ([stub] if stub else [])
)
for suggestion in suggestions:
if suggestion.startswith(stub):
yield Completion(suggestion, start_position=-len(stub))
except Exception:
return
def _suggest_commands(self, prefix: str) -> Iterable[Completion]: def _suggest_commands(self, prefix: str) -> Iterable[Completion]:
prefix = prefix.upper() prefix = prefix.upper()
keys = [self.falyx.exit_command.key] keys = [self.falyx.exit_command.key]

View File

@ -507,7 +507,6 @@ class Falyx:
message=self.prompt, message=self.prompt,
multiline=False, multiline=False,
completer=self._get_completer(), completer=self._get_completer(),
reserve_space_for_menu=1,
validator=CommandValidator(self, self._get_validator_error_message()), validator=CommandValidator(self, self._get_validator_error_message()),
bottom_toolbar=self._get_bottom_bar_render(), bottom_toolbar=self._get_bottom_bar_render(),
key_bindings=self.key_bindings, key_bindings=self.key_bindings,

View File

@ -26,6 +26,7 @@ class Argument:
resolver (BaseAction | None): resolver (BaseAction | None):
An action object that resolves the argument, if applicable. An action object that resolves the argument, if applicable.
lazy_resolver (bool): True if the resolver should be called lazily, False otherwise lazy_resolver (bool): True if the resolver should be called lazily, False otherwise
suggestions (list[str] | None): A list of suggestions for the argument.
""" """
flags: tuple[str, ...] flags: tuple[str, ...]
@ -40,6 +41,7 @@ class Argument:
positional: bool = False positional: bool = False
resolver: BaseAction | None = None resolver: BaseAction | None = None
lazy_resolver: bool = False lazy_resolver: bool = False
suggestions: list[str] | None = None
def get_positional_text(self) -> str: def get_positional_text(self) -> str:
"""Get the positional text for the argument.""" """Get the positional text for the argument."""

View File

@ -4,7 +4,8 @@ from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from copy import deepcopy from copy import deepcopy
from typing import Any, Iterable from dataclasses import dataclass
from typing import Any, Iterable, Sequence
from rich.console import Console from rich.console import Console
from rich.markup import escape from rich.markup import escape
@ -19,6 +20,12 @@ from falyx.parser.utils import coerce_value
from falyx.signals import HelpSignal from falyx.signals import HelpSignal
@dataclass
class ArgumentState:
arg: Argument
consumed: bool = False
class CommandArgumentParser: class CommandArgumentParser:
""" """
Custom argument parser for Falyx Commands. Custom argument parser for Falyx Commands.
@ -64,6 +71,8 @@ class CommandArgumentParser:
self._flag_map: dict[str, Argument] = {} self._flag_map: dict[str, Argument] = {}
self._dest_set: set[str] = set() self._dest_set: set[str] = set()
self._add_help() self._add_help()
self._last_positional_states: dict[str, ArgumentState] = {}
self._last_keyword_states: dict[str, ArgumentState] = {}
def _add_help(self): def _add_help(self):
"""Add help argument to the parser.""" """Add help argument to the parser."""
@ -359,19 +368,19 @@ class CommandArgumentParser:
) )
self._register_argument(argument) self._register_argument(argument)
self._register_argument(negated_argument) self._register_argument(negated_argument, bypass_validation=True)
def _register_argument(self, argument: Argument): def _register_argument(
self, argument: Argument, bypass_validation: bool = False
) -> None:
for flag in argument.flags: for flag in argument.flags:
if ( if flag in self._flag_map and not bypass_validation:
flag in self._flag_map
and not argument.action == ArgumentAction.STORE_BOOL_OPTIONAL
):
existing = self._flag_map[flag] existing = self._flag_map[flag]
raise CommandArgumentError( raise CommandArgumentError(
f"Flag '{flag}' is already used by argument '{existing.dest}'" f"Flag '{flag}' is already used by argument '{existing.dest}'"
) )
for flag in argument.flags: for flag in argument.flags:
self._flag_map[flag] = argument self._flag_map[flag] = argument
if not argument.positional: if not argument.positional:
@ -396,6 +405,7 @@ class CommandArgumentParser:
dest: str | None = None, dest: str | None = None,
resolver: BaseAction | None = None, resolver: BaseAction | None = None,
lazy_resolver: bool = True, lazy_resolver: bool = True,
suggestions: list[str] | None = None,
) -> None: ) -> None:
"""Add an argument to the parser. """Add an argument to the parser.
For `ArgumentAction.ACTION`, `nargs` and `type` determine how many and what kind For `ArgumentAction.ACTION`, `nargs` and `type` determine how many and what kind
@ -415,6 +425,8 @@ class CommandArgumentParser:
help: A brief description of the argument. help: A brief description of the argument.
dest: The name of the attribute to be added to the object returned by parse_args(). dest: The name of the attribute to be added to the object returned by parse_args().
resolver: A BaseAction called with optional nargs specified parsed arguments. resolver: A BaseAction called with optional nargs specified parsed arguments.
lazy_resolver: If True, the resolver is called lazily when the argument is accessed.
suggestions: A list of suggestions for the argument.
""" """
expected_type = type expected_type = type
self._validate_flags(flags) self._validate_flags(flags)
@ -445,6 +457,10 @@ class CommandArgumentParser:
f"Default value '{default}' not in allowed choices: {choices}" f"Default value '{default}' not in allowed choices: {choices}"
) )
required = self._determine_required(required, positional, nargs, action) required = self._determine_required(required, positional, nargs, action)
if not isinstance(suggestions, Sequence) and suggestions is not None:
raise CommandArgumentError(
f"suggestions must be a list or None, got {type(suggestions)}"
)
if not isinstance(lazy_resolver, bool): if not isinstance(lazy_resolver, bool):
raise CommandArgumentError( raise CommandArgumentError(
f"lazy_resolver must be a boolean, got {type(lazy_resolver)}" f"lazy_resolver must be a boolean, got {type(lazy_resolver)}"
@ -465,6 +481,7 @@ class CommandArgumentParser:
positional=positional, positional=positional,
resolver=resolver, resolver=resolver,
lazy_resolver=lazy_resolver, lazy_resolver=lazy_resolver,
suggestions=suggestions,
) )
self._register_argument(argument) self._register_argument(argument)
@ -490,6 +507,27 @@ class CommandArgumentParser:
) )
return defs return defs
def raise_remaining_args_error(
self, token: str, arg_states: dict[str, ArgumentState]
) -> None:
consumed_dests = [
state.arg.dest for state in arg_states.values() if state.consumed
]
remaining_flags = [
flag
for flag, arg in self._keyword.items()
if arg.dest not in consumed_dests and flag.startswith(token)
]
if remaining_flags:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Did you mean one of: {', '.join(remaining_flags)}?"
)
else:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Use --help to see available options."
)
def _consume_nargs( def _consume_nargs(
self, args: list[str], start: int, spec: Argument self, args: list[str], start: int, spec: Argument
) -> tuple[list[str], int]: ) -> tuple[list[str], int]:
@ -535,6 +573,7 @@ class CommandArgumentParser:
result: dict[str, Any], result: dict[str, Any],
positional_args: list[Argument], positional_args: list[Argument],
consumed_positional_indicies: set[int], consumed_positional_indicies: set[int],
arg_states: dict[str, ArgumentState],
from_validate: bool = False, from_validate: bool = False,
) -> int: ) -> int:
remaining_positional_args = [ remaining_positional_args = [
@ -580,17 +619,7 @@ class CommandArgumentParser:
except Exception as error: except Exception as error:
if len(args[i - new_i :]) == 1 and args[i - new_i].startswith("-"): if len(args[i - new_i :]) == 1 and args[i - new_i].startswith("-"):
token = args[i - new_i] token = args[i - new_i]
valid_flags = [ self.raise_remaining_args_error(token, arg_states)
flag for flag in self._flag_map if flag.startswith(token)
]
if valid_flags:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Did you mean one of: {', '.join(valid_flags)}?"
) from error
else:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Use --help to see available options."
) from error
else: else:
raise CommandArgumentError( raise CommandArgumentError(
f"Invalid value for '{spec.dest}': {error}" f"Invalid value for '{spec.dest}': {error}"
@ -606,6 +635,7 @@ class CommandArgumentParser:
raise CommandArgumentError( raise CommandArgumentError(
f"[{spec.dest}] Action failed: {error}" f"[{spec.dest}] Action failed: {error}"
) from error ) from error
arg_states[spec.dest].consumed = True
elif not typed and spec.default: elif not typed and spec.default:
result[spec.dest] = spec.default result[spec.dest] = spec.default
elif spec.action == ArgumentAction.APPEND: elif spec.action == ArgumentAction.APPEND:
@ -618,8 +648,10 @@ class CommandArgumentParser:
assert result.get(spec.dest) is not None, "dest should not be None" assert result.get(spec.dest) is not None, "dest should not be None"
result[spec.dest].extend(typed) result[spec.dest].extend(typed)
elif spec.nargs in (None, 1, "?"): elif spec.nargs in (None, 1, "?"):
arg_states[spec.dest].consumed = True
result[spec.dest] = typed[0] if len(typed) == 1 else typed result[spec.dest] = typed[0] if len(typed) == 1 else typed
else: else:
arg_states[spec.dest].consumed = True
result[spec.dest] = typed result[spec.dest] = typed
if spec.nargs not in ("*", "+"): if spec.nargs not in ("*", "+"):
@ -628,15 +660,7 @@ class CommandArgumentParser:
if i < len(args): if i < len(args):
if len(args[i:]) == 1 and args[i].startswith("-"): if len(args[i:]) == 1 and args[i].startswith("-"):
token = args[i] token = args[i]
valid_flags = [flag for flag in self._flag_map if flag.startswith(token)] self.raise_remaining_args_error(token, arg_states)
if valid_flags:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Did you mean one of: {', '.join(valid_flags)}?"
)
else:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Use --help to see available options."
)
else: else:
plural = "s" if len(args[i:]) > 1 else "" plural = "s" if len(args[i:]) > 1 else ""
raise CommandArgumentError( raise CommandArgumentError(
@ -670,6 +694,7 @@ class CommandArgumentParser:
positional_args: list[Argument], positional_args: list[Argument],
consumed_positional_indices: set[int], consumed_positional_indices: set[int],
consumed_indices: set[int], consumed_indices: set[int],
arg_states: dict[str, ArgumentState],
from_validate: bool = False, from_validate: bool = False,
) -> int: ) -> int:
if token in self._keyword: if token in self._keyword:
@ -679,6 +704,7 @@ class CommandArgumentParser:
if action == ArgumentAction.HELP: if action == ArgumentAction.HELP:
if not from_validate: if not from_validate:
self.render_help() self.render_help()
arg_states[spec.dest].consumed = True
raise HelpSignal() raise HelpSignal()
elif action == ArgumentAction.ACTION: elif action == ArgumentAction.ACTION:
assert isinstance( assert isinstance(
@ -691,24 +717,29 @@ class CommandArgumentParser:
raise CommandArgumentError( raise CommandArgumentError(
f"Invalid value for '{spec.dest}': {error}" f"Invalid value for '{spec.dest}': {error}"
) from error ) from error
try: if not spec.lazy_resolver or not from_validate:
result[spec.dest] = await spec.resolver(*typed_values) try:
except Exception as error: result[spec.dest] = await spec.resolver(*typed_values)
raise CommandArgumentError( except Exception as error:
f"[{spec.dest}] Action failed: {error}" raise CommandArgumentError(
) from error f"[{spec.dest}] Action failed: {error}"
) from error
arg_states[spec.dest].consumed = True
consumed_indices.update(range(i, new_i)) consumed_indices.update(range(i, new_i))
i = new_i i = new_i
elif action == ArgumentAction.STORE_TRUE: elif action == ArgumentAction.STORE_TRUE:
result[spec.dest] = True result[spec.dest] = True
arg_states[spec.dest].consumed = True
consumed_indices.add(i) consumed_indices.add(i)
i += 1 i += 1
elif action == ArgumentAction.STORE_FALSE: elif action == ArgumentAction.STORE_FALSE:
result[spec.dest] = False result[spec.dest] = False
arg_states[spec.dest].consumed = True
consumed_indices.add(i) consumed_indices.add(i)
i += 1 i += 1
elif action == ArgumentAction.STORE_BOOL_OPTIONAL: elif action == ArgumentAction.STORE_BOOL_OPTIONAL:
result[spec.dest] = spec.type(True) result[spec.dest] = spec.type(True)
arg_states[spec.dest].consumed = True
consumed_indices.add(i) consumed_indices.add(i)
i += 1 i += 1
elif action == ArgumentAction.COUNT: elif action == ArgumentAction.COUNT:
@ -778,19 +809,11 @@ class CommandArgumentParser:
) )
else: else:
result[spec.dest] = typed_values result[spec.dest] = typed_values
arg_states[spec.dest].consumed = True
consumed_indices.update(range(i, new_i)) consumed_indices.update(range(i, new_i))
i = new_i i = new_i
elif token.startswith("-"): elif token.startswith("-"):
# Handle unrecognized option self.raise_remaining_args_error(token, arg_states)
valid_flags = [flag for flag in self._flag_map if flag.startswith(token)]
if valid_flags:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Did you mean one of: {', '.join(valid_flags)}?"
)
else:
raise CommandArgumentError(
f"Unrecognized option '{token}'. Use --help to see available options."
)
else: else:
# Get the next flagged argument index if it exists # Get the next flagged argument index if it exists
next_flagged_index = -1 next_flagged_index = -1
@ -805,6 +828,7 @@ class CommandArgumentParser:
result, result,
positional_args, positional_args,
consumed_positional_indices, consumed_positional_indices,
arg_states=arg_states,
from_validate=from_validate, from_validate=from_validate,
) )
i += args_consumed i += args_consumed
@ -817,6 +841,14 @@ class CommandArgumentParser:
if args is None: if args is None:
args = [] args = []
arg_states = {arg.dest: ArgumentState(arg) for arg in self._arguments}
self._last_positional_states = {
arg.dest: arg_states[arg.dest] for arg in self._positional.values()
}
self._last_keyword_states = {
arg.dest: arg_states[arg.dest] for arg in self._keyword_list
}
result = {arg.dest: deepcopy(arg.default) for arg in self._arguments} result = {arg.dest: deepcopy(arg.default) for arg in self._arguments}
positional_args: list[Argument] = [ positional_args: list[Argument] = [
arg for arg in self._arguments if arg.positional arg for arg in self._arguments if arg.positional
@ -838,6 +870,7 @@ class CommandArgumentParser:
positional_args, positional_args,
consumed_positional_indices, consumed_positional_indices,
consumed_indices, consumed_indices,
arg_states=arg_states,
from_validate=from_validate, from_validate=from_validate,
) )
@ -862,6 +895,7 @@ class CommandArgumentParser:
) )
if spec.choices and result.get(spec.dest) not in spec.choices: if spec.choices and result.get(spec.dest) not in spec.choices:
arg_states[spec.dest].consumed = False
raise CommandArgumentError( raise CommandArgumentError(
f"Invalid value for '{spec.dest}': must be one of {{{', '.join(spec.choices)}}}" f"Invalid value for '{spec.dest}': must be one of {{{', '.join(spec.choices)}}}"
) )
@ -914,6 +948,91 @@ class CommandArgumentParser:
kwargs_dict[arg.dest] = parsed[arg.dest] kwargs_dict[arg.dest] = parsed[arg.dest]
return tuple(args_list), kwargs_dict return tuple(args_list), kwargs_dict
def suggest_next(self, args: list[str]) -> list[str]:
"""
Suggest the next possible flags or values given partially typed arguments.
This does NOT raise errors. It is intended for completions, not validation.
Returns:
A list of possible completions based on the current input.
"""
# Case 1: Next positional argument
next_non_consumed_positional: Argument | None = None
for state in self._last_positional_states.values():
if not state.consumed:
next_non_consumed_positional = state.arg
break
if next_non_consumed_positional:
if next_non_consumed_positional.choices:
return sorted(
(str(choice) for choice in next_non_consumed_positional.choices)
)
if next_non_consumed_positional.suggestions:
return sorted(next_non_consumed_positional.suggestions)
consumed_dests = [
state.arg.dest
for state in self._last_keyword_states.values()
if state.consumed
]
remaining_flags = [
flag for flag, arg in self._keyword.items() if arg.dest not in consumed_dests
]
last = args[-1]
next_to_last = args[-2] if len(args) > 1 else ""
suggestions: list[str] = []
# Case 2: Mid-flag (e.g., "--ver")
if last.startswith("-") and last not in self._keyword:
if (
len(args) > 1
and next_to_last in self._keyword
and next_to_last in remaining_flags
):
# If the last token is a mid-flag, suggest based on the previous flag
arg = self._keyword[next_to_last]
if arg.choices:
suggestions.extend(arg.choices)
elif arg.suggestions:
suggestions.extend(arg.suggestions)
else:
possible_flags = [
flag
for flag, arg in self._keyword.items()
if flag.startswith(last) and arg.dest not in consumed_dests
]
suggestions.extend(possible_flags)
# Case 3: Flag that expects a value (e.g., ["--tag"])
elif last in self._keyword:
arg = self._keyword[last]
if arg.choices:
suggestions.extend(arg.choices)
elif arg.suggestions:
suggestions.extend(arg.suggestions)
# Case 4: Last flag with choices mid-choice (e.g., ["--tag", "v"])
elif next_to_last in self._keyword:
arg = self._keyword[next_to_last]
if arg.choices and last not in arg.choices:
suggestions.extend(arg.choices)
elif (
arg.suggestions
and last not in arg.suggestions
and not any(last.startswith(suggestion) for suggestion in arg.suggestions)
and any(suggestion.startswith(last) for suggestion in arg.suggestions)
):
suggestions.extend(arg.suggestions)
else:
suggestions.extend(remaining_flags)
# Case 5: Suggest all remaining flags
else:
suggestions.extend(remaining_flags)
return sorted(set(suggestions))
def get_options_text(self, plain_text=False) -> str: def get_options_text(self, plain_text=False) -> str:
# Options # Options
# Add all keyword arguments to the options list # Add all keyword arguments to the options list

View File

@ -54,8 +54,10 @@ def infer_args_from_func(
if arg_type is bool: if arg_type is bool:
if param.default is False: if param.default is False:
action = "store_true" action = "store_true"
else: default = None
elif param.default is True:
action = "store_false" action = "store_false"
default = None
if arg_type is list: if arg_type is list:
action = "append" action = "append"
@ -75,6 +77,7 @@ def infer_args_from_func(
"action": action, "action": action,
"help": metadata.get("help", ""), "help": metadata.get("help", ""),
"choices": metadata.get("choices"), "choices": metadata.get("choices"),
"suggestions": metadata.get("suggestions"),
} }
) )

View File

@ -1 +1 @@
__version__ = "0.1.62" __version__ = "0.1.63"

View File

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "falyx" name = "falyx"
version = "0.1.62" version = "0.1.63"
description = "Reliable and introspectable async CLI action framework." description = "Reliable and introspectable async CLI action framework."
authors = ["Roland Thomas Jr <roland@rtj.dev>"] authors = ["Roland Thomas Jr <roland@rtj.dev>"]
license = "MIT" license = "MIT"

View File

@ -0,0 +1,18 @@
import pytest
from falyx.parser.command_argument_parser import CommandArgumentParser
@pytest.mark.asyncio
@pytest.mark.parametrize(
"input_tokens, expected",
[
([""], ["--help", "--tag", "-h"]),
(["--ta"], ["--tag"]),
(["--tag"], ["analytics", "build"]),
],
)
async def test_suggest_next(input_tokens, expected):
parser = CommandArgumentParser(...)
parser.add_argument("--tag", choices=["analytics", "build"])
assert sorted(parser.suggest_next(input_tokens)) == sorted(expected)