Add LoadFileAction, Rename ActionFactoryAction->ActionFactory, Rename falyx.action.mixins->falyx.action.action_mixins, fix bug unable to parse negative numbers in CommandArgumentParser

This commit is contained in:
2025-06-27 22:33:14 -04:00
parent 38f5f1e934
commit bb325684ac
23 changed files with 461 additions and 190 deletions

View File

@ -1,7 +1,7 @@
import asyncio
from falyx import Falyx
from falyx.action import ActionFactoryAction, ChainedAction, HTTPAction, SelectionAction
from falyx.action import ActionFactory, ChainedAction, HTTPAction, SelectionAction
# Selection of a post ID to fetch (just an example set)
post_selector = SelectionAction(
@ -24,7 +24,7 @@ async def build_post_action(post_id) -> HTTPAction:
)
post_factory = ActionFactoryAction(
post_factory = ActionFactory(
name="Build HTTPAction from Post ID",
factory=build_post_action,
inject_last_result=True,

View File

@ -29,7 +29,10 @@ flx.add_command(
),
arg_metadata={
"service": "Service name",
"region": {"help": "Deployment region", "choices": ["us-east-1", "us-west-2"]},
"region": {
"help": "Deployment region",
"choices": ["us-east-1", "us-west-2", "eu-west-1"],
},
"verbose": {"help": "Enable verbose mode"},
},
tags=["deployment", "service"],

View File

@ -6,7 +6,7 @@ Licensed under the MIT License. See LICENSE file for details.
"""
from .action import Action
from .action_factory import ActionFactoryAction
from .action_factory import ActionFactory
from .action_group import ActionGroup
from .base_action import BaseAction
from .chained_action import ChainedAction
@ -14,6 +14,7 @@ from .fallback_action import FallbackAction
from .http_action import HTTPAction
from .io_action import BaseIOAction
from .literal_input_action import LiteralInputAction
from .load_file_action import LoadFileAction
from .menu_action import MenuAction
from .process_action import ProcessAction
from .process_pool_action import ProcessPoolAction
@ -30,7 +31,7 @@ __all__ = [
"BaseAction",
"ChainedAction",
"ProcessAction",
"ActionFactoryAction",
"ActionFactory",
"HTTPAction",
"BaseIOAction",
"ShellAction",
@ -43,4 +44,5 @@ __all__ = [
"UserInputAction",
"PromptMenuAction",
"ProcessPoolAction",
"LoadFileAction",
]

View File

@ -2,7 +2,7 @@
"""action.py"""
from __future__ import annotations
from typing import Any, Callable
from typing import Any, Awaitable, Callable
from rich.tree import Tree
@ -42,9 +42,9 @@ class Action(BaseAction):
def __init__(
self,
name: str,
action: Callable[..., Any],
action: Callable[..., Any] | Callable[..., Awaitable[Any]],
*,
rollback: Callable[..., Any] | None = None,
rollback: Callable[..., Any] | Callable[..., Awaitable[Any]] | None = None,
args: tuple[Any, ...] = (),
kwargs: dict[str, Any] | None = None,
hooks: HookManager | None = None,
@ -69,19 +69,19 @@ class Action(BaseAction):
self.enable_retry()
@property
def action(self) -> Callable[..., Any]:
def action(self) -> Callable[..., Awaitable[Any]]:
return self._action
@action.setter
def action(self, value: Callable[..., Any]):
def action(self, value: Callable[..., Awaitable[Any]]):
self._action = ensure_async(value)
@property
def rollback(self) -> Callable[..., Any] | None:
def rollback(self) -> Callable[..., Awaitable[Any]] | None:
return self._rollback
@rollback.setter
def rollback(self, value: Callable[..., Any] | None):
def rollback(self, value: Callable[..., Awaitable[Any]] | None):
if value is None:
self._rollback = None
else:

View File

@ -1,5 +1,5 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""action_factory.py"""
"""action_factory_action.py"""
from typing import Any, Callable
from rich.tree import Tree
@ -14,7 +14,7 @@ from falyx.themes import OneColors
from falyx.utils import ensure_async
class ActionFactoryAction(BaseAction):
class ActionFactory(BaseAction):
"""
Dynamically creates and runs another Action at runtime using a factory function.

View File

@ -2,14 +2,15 @@
"""action_group.py"""
import asyncio
import random
from typing import Any, Callable, Sequence
from typing import Any, Awaitable, Callable, Sequence
from rich.tree import Tree
from falyx.action.action import Action
from falyx.action.action_mixins import ActionListMixin
from falyx.action.base_action import BaseAction
from falyx.action.mixins import ActionListMixin
from falyx.context import ExecutionContext, SharedContext
from falyx.exceptions import EmptyGroupError
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import Hook, HookManager, HookType
from falyx.logger import logger
@ -54,7 +55,9 @@ class ActionGroup(BaseAction, ActionListMixin):
def __init__(
self,
name: str,
actions: Sequence[BaseAction | Callable[..., Any]] | None = None,
actions: (
Sequence[BaseAction | Callable[..., Any] | Callable[..., Awaitable]] | None
) = None,
*,
hooks: HookManager | None = None,
inject_last_result: bool = False,
@ -104,6 +107,8 @@ class ActionGroup(BaseAction, ActionListMixin):
return None, None
async def _run(self, *args, **kwargs) -> list[tuple[str, Any]]:
if not self.actions:
raise EmptyGroupError(f"[{self.name}] No actions to execute.")
shared_context = SharedContext(name=self.name, action=self, is_parallel=True)
if self.shared_context:
shared_context.set_shared_result(self.shared_context.last_result())

View File

@ -1,5 +1,5 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""mixins.py"""
"""action_mixins.py"""
from typing import Sequence
from falyx.action.base_action import BaseAction

View File

@ -38,7 +38,6 @@ from rich.tree import Tree
from falyx.context import SharedContext
from falyx.debug import register_debug_hooks
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import Hook, HookManager, HookType
from falyx.logger import logger
from falyx.options_manager import OptionsManager

View File

@ -2,15 +2,15 @@
"""chained_action.py"""
from __future__ import annotations
from typing import Any, Callable, Sequence
from typing import Any, Awaitable, Callable, Sequence
from rich.tree import Tree
from falyx.action.action import Action
from falyx.action.action_mixins import ActionListMixin
from falyx.action.base_action import BaseAction
from falyx.action.fallback_action import FallbackAction
from falyx.action.literal_input_action import LiteralInputAction
from falyx.action.mixins import ActionListMixin
from falyx.context import ExecutionContext, SharedContext
from falyx.exceptions import EmptyChainError
from falyx.execution_registry import ExecutionRegistry as er
@ -47,7 +47,10 @@ class ChainedAction(BaseAction, ActionListMixin):
def __init__(
self,
name: str,
actions: Sequence[BaseAction | Callable[..., Any]] | None = None,
actions: (
Sequence[BaseAction | Callable[..., Any] | Callable[..., Awaitable[Any]]]
| None
) = None,
*,
hooks: HookManager | None = None,
inject_last_result: bool = False,

View File

@ -0,0 +1,196 @@
# Falyx CLI Framework — (c) 2025 rtj.dev LLC — MIT Licensed
"""load_file_action.py"""
import csv
import json
import xml.etree.ElementTree as ET
from datetime import datetime
from pathlib import Path
from typing import Any
import toml
import yaml
from rich.tree import Tree
from falyx.action.action_types import FileType
from falyx.action.base_action import BaseAction
from falyx.context import ExecutionContext
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookType
from falyx.logger import logger
from falyx.themes import OneColors
class LoadFileAction(BaseAction):
"""LoadFileAction allows loading and parsing files of various types."""
def __init__(
self,
name: str,
file_path: str | Path | None = None,
file_type: FileType | str = FileType.TEXT,
inject_last_result: bool = False,
inject_into: str = "file_path",
):
super().__init__(
name=name, inject_last_result=inject_last_result, inject_into=inject_into
)
self._file_path = self._coerce_file_path(file_path)
self._file_type = self._coerce_file_type(file_type)
@property
def file_path(self) -> Path | None:
"""Get the file path as a Path object."""
return self._file_path
@file_path.setter
def file_path(self, value: str | Path):
"""Set the file path, converting to Path if necessary."""
self._file_path = self._coerce_file_path(value)
def _coerce_file_path(self, file_path: str | Path | None) -> Path | None:
"""Coerce the file path to a Path object."""
if isinstance(file_path, Path):
return file_path
elif isinstance(file_path, str):
return Path(file_path)
elif file_path is None:
return None
else:
raise TypeError("file_path must be a string or Path object")
@property
def file_type(self) -> FileType:
"""Get the file type."""
return self._file_type
@file_type.setter
def file_type(self, value: FileType | str):
"""Set the file type, converting to FileType if necessary."""
self._file_type = self._coerce_file_type(value)
def _coerce_file_type(self, file_type: FileType | str) -> FileType:
"""Coerce the file type to a FileType enum."""
if isinstance(file_type, FileType):
return file_type
elif isinstance(file_type, str):
return FileType(file_type)
else:
raise TypeError("file_type must be a FileType enum or string")
def get_infer_target(self) -> tuple[None, None]:
return None, None
def load_file(self) -> Any:
if self.file_path is None:
raise ValueError("file_path must be set before loading a file")
value: Any = None
try:
if self.file_type == FileType.TEXT:
value = self.file_path.read_text(encoding="UTF-8")
elif self.file_type == FileType.PATH:
value = self.file_path
elif self.file_type == FileType.JSON:
value = json.loads(self.file_path.read_text(encoding="UTF-8"))
elif self.file_type == FileType.TOML:
value = toml.loads(self.file_path.read_text(encoding="UTF-8"))
elif self.file_type == FileType.YAML:
value = yaml.safe_load(self.file_path.read_text(encoding="UTF-8"))
elif self.file_type == FileType.CSV:
with open(self.file_path, newline="", encoding="UTF-8") as csvfile:
reader = csv.reader(csvfile)
value = list(reader)
elif self.file_type == FileType.TSV:
with open(self.file_path, newline="", encoding="UTF-8") as tsvfile:
reader = csv.reader(tsvfile, delimiter="\t")
value = list(reader)
elif self.file_type == FileType.XML:
tree = ET.parse(self.file_path, parser=ET.XMLParser(encoding="UTF-8"))
root = tree.getroot()
value = ET.tostring(root, encoding="unicode")
else:
raise ValueError(f"Unsupported return type: {self.file_type}")
except Exception as error:
logger.error("Failed to parse %s: %s", self.file_path.name, error)
return value
async def _run(self, *args, **kwargs) -> Any:
context = ExecutionContext(name=self.name, args=args, kwargs=kwargs, action=self)
context.start_timer()
try:
await self.hooks.trigger(HookType.BEFORE, context)
if "file_path" in kwargs:
self.file_path = kwargs["file_path"]
elif self.inject_last_result and self.last_result:
self.file_path = self.last_result
if self.file_path is None:
raise ValueError("file_path must be set before loading a file")
elif not self.file_path.exists():
raise FileNotFoundError(f"File not found: {self.file_path}")
elif not self.file_path.is_file():
raise ValueError(f"Path is not a regular file: {self.file_path}")
result = self.load_file()
await self.hooks.trigger(HookType.ON_SUCCESS, context)
return result
except Exception as error:
context.exception = error
await self.hooks.trigger(HookType.ON_ERROR, context)
raise
finally:
context.stop_timer()
await self.hooks.trigger(HookType.AFTER, context)
await self.hooks.trigger(HookType.ON_TEARDOWN, context)
er.record(context)
async def preview(self, parent: Tree | None = None):
label = f"[{OneColors.GREEN}]📄 LoadFileAction[/] '{self.name}'"
tree = parent.add(label) if parent else Tree(label)
tree.add(f"[dim]Path:[/] {self.file_path}")
tree.add(f"[dim]Type:[/] {self.file_type.name if self.file_type else 'None'}")
if self.file_path is None:
tree.add(f"[{OneColors.DARK_RED_b}]❌ File path is not set[/]")
elif not self.file_path.exists():
tree.add(f"[{OneColors.DARK_RED_b}]❌ File does not exist[/]")
elif not self.file_path.is_file():
tree.add(f"[{OneColors.LIGHT_YELLOW_b}]⚠️ Not a regular file[/]")
else:
try:
stat = self.file_path.stat()
tree.add(f"[dim]Size:[/] {stat.st_size:,} bytes")
tree.add(
f"[dim]Modified:[/] {datetime.fromtimestamp(stat.st_mtime):%Y-%m-%d %H:%M:%S}"
)
tree.add(
f"[dim]Created:[/] {datetime.fromtimestamp(stat.st_ctime):%Y-%m-%d %H:%M:%S}"
)
if self.file_type == FileType.TEXT:
preview_lines = self.file_path.read_text(
encoding="UTF-8"
).splitlines()[:10]
content_tree = tree.add("[dim]Preview (first 10 lines):[/]")
for line in preview_lines:
content_tree.add(f"[dim]{line}[/]")
elif self.file_type in {FileType.JSON, FileType.YAML, FileType.TOML}:
raw = self.load_file()
if raw is not None:
preview_str = (
json.dumps(raw, indent=2)
if isinstance(raw, dict)
else str(raw)
)
preview_lines = preview_str.splitlines()[:10]
content_tree = tree.add("[dim]Parsed preview:[/]")
for line in preview_lines:
content_tree.add(f"[dim]{line}[/]")
except Exception as e:
tree.add(f"[{OneColors.DARK_RED_b}]❌ Error reading file:[/] {e}")
if not parent:
self.console.print(tree)
def __str__(self) -> str:
return f"LoadFileAction(file_path={self.file_path}, file_type={self.file_type})"

View File

@ -7,12 +7,13 @@ import random
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass, field
from functools import partial
from typing import Any, Callable
from typing import Any, Callable, Sequence
from rich.tree import Tree
from falyx.action.base_action import BaseAction
from falyx.context import ExecutionContext, SharedContext
from falyx.exceptions import EmptyPoolError
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookManager, HookType
from falyx.logger import logger
@ -37,7 +38,7 @@ class ProcessPoolAction(BaseAction):
def __init__(
self,
name: str,
actions: list[ProcessTask] | None = None,
actions: Sequence[ProcessTask] | None = None,
*,
hooks: HookManager | None = None,
executor: ProcessPoolExecutor | None = None,
@ -56,7 +57,7 @@ class ProcessPoolAction(BaseAction):
if actions:
self.set_actions(actions)
def set_actions(self, actions: list[ProcessTask]) -> None:
def set_actions(self, actions: Sequence[ProcessTask]) -> None:
"""Replaces the current action list with a new one."""
self.actions.clear()
for action in actions:
@ -78,6 +79,8 @@ class ProcessPoolAction(BaseAction):
return None, None
async def _run(self, *args, **kwargs) -> Any:
if not self.actions:
raise EmptyPoolError(f"[{self.name}] No actions to execute.")
shared_context = SharedContext(name=self.name, action=self, is_parallel=True)
if self.shared_context:
shared_context.set_shared_result(self.shared_context.last_result())

View File

@ -107,7 +107,10 @@ class SelectFileAction(BaseAction):
def _coerce_return_type(self, return_type: FileType | str) -> FileType:
if isinstance(return_type, FileType):
return return_type
return FileType(return_type)
elif isinstance(return_type, str):
return FileType(return_type)
else:
raise TypeError("return_type must be a FileType enum or string")
def get_options(self, files: list[Path]) -> dict[str, SelectionOption]:
value: Any

View File

@ -19,7 +19,7 @@ in building robust interactive menus.
from __future__ import annotations
import shlex
from typing import Any, Callable
from typing import Any, Awaitable, Callable
from prompt_toolkit.formatted_text import FormattedText
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator
@ -105,7 +105,7 @@ class Command(BaseModel):
key: str
description: str
action: BaseAction | Callable[..., Any]
action: BaseAction | Callable[..., Any] | Callable[..., Awaitable[Any]]
args: tuple = ()
kwargs: dict[str, Any] = Field(default_factory=dict)
hidden: bool = False

View File

@ -30,5 +30,13 @@ class EmptyChainError(FalyxError):
"""Exception raised when the chain is empty."""
class EmptyGroupError(FalyxError):
"""Exception raised when the chain is empty."""
class EmptyPoolError(FalyxError):
"""Exception raised when the chain is empty."""
class CommandArgumentError(FalyxError):
"""Exception raised when there is an error in the command argument parser."""

View File

@ -2,4 +2,4 @@
"""logger.py"""
import logging
logger = logging.getLogger("falyx")
logger: logging.Logger = logging.getLogger("falyx")

View File

@ -46,6 +46,7 @@ class Argument:
ArgumentAction.STORE,
ArgumentAction.APPEND,
ArgumentAction.EXTEND,
ArgumentAction.ACTION,
)
and not self.positional
):
@ -54,6 +55,7 @@ class Argument:
ArgumentAction.STORE,
ArgumentAction.APPEND,
ArgumentAction.EXTEND,
ArgumentAction.ACTION,
) or isinstance(self.nargs, str):
choice_text = self.dest

View File

@ -177,20 +177,19 @@ class CommandArgumentParser:
else:
choices = []
for choice in choices:
if not isinstance(choice, expected_type):
try:
coerce_value(choice, expected_type)
except Exception as error:
raise CommandArgumentError(
f"Invalid choice {choice!r}: not coercible to {expected_type.__name__} error: {error}"
) from error
try:
coerce_value(choice, expected_type)
except Exception as error:
raise CommandArgumentError(
f"Invalid choice {choice!r}: not coercible to {expected_type.__name__} error: {error}"
) from error
return choices
def _validate_default_type(
self, default: Any, expected_type: type, dest: str
) -> None:
"""Validate the default value type."""
if default is not None and not isinstance(default, expected_type):
if default is not None:
try:
coerce_value(default, expected_type)
except Exception as error:
@ -203,13 +202,12 @@ class CommandArgumentParser:
) -> None:
if isinstance(default, list):
for item in default:
if not isinstance(item, expected_type):
try:
coerce_value(item, expected_type)
except Exception as error:
raise CommandArgumentError(
f"Default list value {default!r} for '{dest}' cannot be coerced to {expected_type.__name__} error: {error}"
) from error
try:
coerce_value(item, expected_type)
except Exception as error:
raise CommandArgumentError(
f"Default list value {default!r} for '{dest}' cannot be coerced to {expected_type.__name__} error: {error}"
) from error
def _validate_resolver(
self, action: ArgumentAction, resolver: BaseAction | None
@ -422,22 +420,22 @@ class CommandArgumentParser:
raise CommandArgumentError(
f"Expected at least one value for '{spec.dest}'"
)
while i < len(args) and not args[i].startswith("-"):
while i < len(args) and args[i] not in self._keyword:
values.append(args[i])
i += 1
assert values, "Expected at least one value for '+' nargs: shouldn't happen"
return values, i
elif spec.nargs == "*":
while i < len(args) and not args[i].startswith("-"):
while i < len(args) and args[i] not in self._keyword:
values.append(args[i])
i += 1
return values, i
elif spec.nargs == "?":
if i < len(args) and not args[i].startswith("-"):
if i < len(args) and args[i] not in self._keyword:
return [args[i]], i + 1
return [], i
elif spec.nargs is None:
if i < len(args) and not args[i].startswith("-"):
if i < len(args) and args[i] not in self._keyword:
return [args[i]], i + 1
return [], i
assert False, "Invalid nargs value: shouldn't happen"
@ -524,23 +522,142 @@ class CommandArgumentParser:
return i
def _expand_posix_bundling(self, args: list[str]) -> list[str]:
def _expand_posix_bundling(self, token: str) -> list[str] | str:
"""Expand POSIX-style bundled arguments into separate arguments."""
expanded = []
for token in args:
if token.startswith("-") and not token.startswith("--") and len(token) > 2:
# POSIX bundle
# e.g. -abc -> -a -b -c
for char in token[1:]:
flag = f"-{char}"
arg = self._flag_map.get(flag)
if not arg:
raise CommandArgumentError(f"Unrecognized option: {flag}")
expanded.append(flag)
else:
expanded.append(token)
if token.startswith("-") and not token.startswith("--") and len(token) > 2:
# POSIX bundle
# e.g. -abc -> -a -b -c
for char in token[1:]:
flag = f"-{char}"
arg = self._flag_map.get(flag)
if not arg:
raise CommandArgumentError(f"Unrecognized option: {flag}")
expanded.append(flag)
else:
return token
return expanded
async def _handle_token(
self,
token: str,
args: list[str],
i: int,
result: dict[str, Any],
positional_args: list[Argument],
consumed_positional_indices: set[int],
consumed_indices: set[int],
from_validate: bool = False,
) -> int:
if token in self._keyword:
spec = self._keyword[token]
action = spec.action
if action == ArgumentAction.HELP:
if not from_validate:
self.render_help()
raise HelpSignal()
elif action == ArgumentAction.ACTION:
assert isinstance(
spec.resolver, BaseAction
), "resolver should be an instance of BaseAction"
values, new_i = self._consume_nargs(args, i + 1, spec)
try:
typed_values = [coerce_value(value, spec.type) for value in values]
except ValueError as error:
raise CommandArgumentError(
f"Invalid value for '{spec.dest}': {error}"
) from error
try:
result[spec.dest] = await spec.resolver(*typed_values)
except Exception as error:
raise CommandArgumentError(
f"[{spec.dest}] Action failed: {error}"
) from error
consumed_indices.update(range(i, new_i))
i = new_i
elif action == ArgumentAction.STORE_TRUE:
result[spec.dest] = True
consumed_indices.add(i)
i += 1
elif action == ArgumentAction.STORE_FALSE:
result[spec.dest] = False
consumed_indices.add(i)
i += 1
elif action == ArgumentAction.COUNT:
result[spec.dest] = result.get(spec.dest, 0) + 1
consumed_indices.add(i)
i += 1
elif action == ArgumentAction.APPEND:
assert result.get(spec.dest) is not None, "dest should not be None"
values, new_i = self._consume_nargs(args, i + 1, spec)
try:
typed_values = [coerce_value(value, spec.type) for value in values]
except ValueError as error:
raise CommandArgumentError(
f"Invalid value for '{spec.dest}': {error}"
) from error
if spec.nargs is None:
result[spec.dest].append(spec.type(values[0]))
else:
result[spec.dest].append(typed_values)
consumed_indices.update(range(i, new_i))
i = new_i
elif action == ArgumentAction.EXTEND:
assert result.get(spec.dest) is not None, "dest should not be None"
values, new_i = self._consume_nargs(args, i + 1, spec)
try:
typed_values = [coerce_value(value, spec.type) for value in values]
except ValueError as error:
raise CommandArgumentError(
f"Invalid value for '{spec.dest}': {error}"
) from error
result[spec.dest].extend(typed_values)
consumed_indices.update(range(i, new_i))
i = new_i
else:
values, new_i = self._consume_nargs(args, i + 1, spec)
try:
typed_values = [coerce_value(value, spec.type) for value in values]
except ValueError as error:
raise CommandArgumentError(
f"Invalid value for '{spec.dest}': {error}"
) from error
if not typed_values and spec.nargs not in ("*", "?"):
raise CommandArgumentError(
f"Expected at least one value for '{spec.dest}'"
)
if spec.nargs in (None, 1, "?") and spec.action != ArgumentAction.APPEND:
result[spec.dest] = (
typed_values[0] if len(typed_values) == 1 else typed_values
)
else:
result[spec.dest] = typed_values
consumed_indices.update(range(i, new_i))
i = new_i
elif token.startswith("-"):
# Handle unrecognized option
raise CommandArgumentError(f"Unrecognized flag: {token}")
else:
# Get the next flagged argument index if it exists
next_flagged_index = -1
for index, arg in enumerate(args[i:], start=i):
if arg in self._keyword:
next_flagged_index = index
break
print(f"next_flagged_index: {next_flagged_index}")
print(f"{self._keyword_list=}")
if next_flagged_index == -1:
next_flagged_index = len(args)
args_consumed = await self._consume_all_positional_args(
args[i:next_flagged_index],
result,
positional_args,
consumed_positional_indices,
)
i += args_consumed
return i
async def parse_args(
self, args: list[str] | None = None, from_validate: bool = False
) -> dict[str, Any]:
@ -548,132 +665,29 @@ class CommandArgumentParser:
if args is None:
args = []
args = self._expand_posix_bundling(args)
result = {arg.dest: deepcopy(arg.default) for arg in self._arguments}
positional_args = [arg for arg in self._arguments if arg.positional]
positional_args: list[Argument] = [
arg for arg in self._arguments if arg.positional
]
consumed_positional_indices: set[int] = set()
consumed_indices: set[int] = set()
i = 0
while i < len(args):
token = args[i]
if token in self._keyword:
spec = self._keyword[token]
action = spec.action
if action == ArgumentAction.HELP:
if not from_validate:
self.render_help()
raise HelpSignal()
elif action == ArgumentAction.ACTION:
assert isinstance(
spec.resolver, BaseAction
), "resolver should be an instance of BaseAction"
values, new_i = self._consume_nargs(args, i + 1, spec)
try:
typed_values = [
coerce_value(value, spec.type) for value in values
]
except ValueError as error:
raise CommandArgumentError(
f"Invalid value for '{spec.dest}': {error}"
) from error
try:
result[spec.dest] = await spec.resolver(*typed_values)
except Exception as error:
raise CommandArgumentError(
f"[{spec.dest}] Action failed: {error}"
) from error
consumed_indices.update(range(i, new_i))
i = new_i
elif action == ArgumentAction.STORE_TRUE:
result[spec.dest] = True
consumed_indices.add(i)
i += 1
elif action == ArgumentAction.STORE_FALSE:
result[spec.dest] = False
consumed_indices.add(i)
i += 1
elif action == ArgumentAction.COUNT:
result[spec.dest] = result.get(spec.dest, 0) + 1
consumed_indices.add(i)
i += 1
elif action == ArgumentAction.APPEND:
assert result.get(spec.dest) is not None, "dest should not be None"
values, new_i = self._consume_nargs(args, i + 1, spec)
try:
typed_values = [
coerce_value(value, spec.type) for value in values
]
except ValueError as error:
raise CommandArgumentError(
f"Invalid value for '{spec.dest}': {error}"
) from error
if spec.nargs is None:
result[spec.dest].append(spec.type(values[0]))
else:
result[spec.dest].append(typed_values)
consumed_indices.update(range(i, new_i))
i = new_i
elif action == ArgumentAction.EXTEND:
assert result.get(spec.dest) is not None, "dest should not be None"
values, new_i = self._consume_nargs(args, i + 1, spec)
try:
typed_values = [
coerce_value(value, spec.type) for value in values
]
except ValueError as error:
raise CommandArgumentError(
f"Invalid value for '{spec.dest}': {error}"
) from error
result[spec.dest].extend(typed_values)
consumed_indices.update(range(i, new_i))
i = new_i
else:
values, new_i = self._consume_nargs(args, i + 1, spec)
try:
typed_values = [
coerce_value(value, spec.type) for value in values
]
except ValueError as error:
raise CommandArgumentError(
f"Invalid value for '{spec.dest}': {error}"
) from error
if not typed_values and spec.nargs not in ("*", "?"):
raise CommandArgumentError(
f"Expected at least one value for '{spec.dest}'"
)
if (
spec.nargs in (None, 1, "?")
and spec.action != ArgumentAction.APPEND
):
result[spec.dest] = (
typed_values[0] if len(typed_values) == 1 else typed_values
)
else:
result[spec.dest] = typed_values
consumed_indices.update(range(i, new_i))
i = new_i
elif token.startswith("-"):
# Handle unrecognized option
raise CommandArgumentError(f"Unrecognized flag: {token}")
else:
# Get the next flagged argument index if it exists
next_flagged_index = -1
for index, arg in enumerate(args[i:], start=i):
if arg.startswith("-"):
next_flagged_index = index
break
if next_flagged_index == -1:
next_flagged_index = len(args)
args_consumed = await self._consume_all_positional_args(
args[i:next_flagged_index],
result,
positional_args,
consumed_positional_indices,
)
i += args_consumed
token = self._expand_posix_bundling(args[i])
if isinstance(token, list):
args[i : i + 1] = token
token = args[i]
i = await self._handle_token(
token,
args,
i,
result,
positional_args,
consumed_positional_indices,
consumed_indices,
from_validate=from_validate,
)
# Required validation
for spec in self._arguments:
@ -797,6 +811,8 @@ class CommandArgumentParser:
flags = arg.get_positional_text()
arg_line = Text(f" {flags:<30} ")
help_text = arg.help or ""
if help_text and len(flags) > 30:
help_text = f"\n{'':<33}{help_text}"
arg_line.append(help_text)
self.console.print(arg_line)
self.console.print("[bold]options:[/bold]")
@ -805,6 +821,8 @@ class CommandArgumentParser:
flags_choice = f"{flags} {arg.get_choice_text()}"
arg_line = Text(f" {flags_choice:<30} ")
help_text = arg.help or ""
if help_text and len(flags_choice) > 30:
help_text = f"\n{'':<33}{help_text}"
arg_line.append(help_text)
self.console.print(arg_line)

View File

@ -33,7 +33,6 @@ def coerce_enum(value: Any, enum_type: EnumMeta) -> Any:
pass
base_type = type(next(iter(enum_type)).value)
print(base_type)
try:
coerced_value = base_type(value)
return enum_type(coerced_value)

View File

@ -2,14 +2,16 @@
"""protocols.py"""
from __future__ import annotations
from typing import Any, Awaitable, Protocol, runtime_checkable
from typing import Any, Awaitable, Callable, Protocol, runtime_checkable
from falyx.action.base_action import BaseAction
@runtime_checkable
class ActionFactoryProtocol(Protocol):
async def __call__(self, *args: Any, **kwargs: Any) -> Awaitable[BaseAction]: ...
async def __call__(
self, *args: Any, **kwargs: Any
) -> Callable[..., Awaitable[BaseAction]]: ...
@runtime_checkable

View File

@ -1 +1 @@
__version__ = "0.1.53"
__version__ = "0.1.54"

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "falyx"
version = "0.1.53"
version = "0.1.54"
description = "Reliable and introspectable async CLI action framework."
authors = ["Roland Thomas Jr <roland@rtj.dev>"]
license = "MIT"
@ -27,6 +27,10 @@ black = { version = "^25.0", allow-prereleases = true }
mypy = { version = "^1.0", allow-prereleases = true }
isort = { version = "^5.0", allow-prereleases = true }
pytest-cov = "^4.0"
mkdocs = "^1.6.1"
mkdocs-material = "^9.6.14"
mkdocstrings = {extras = ["python"], version = "^0.29.1"}
mike = "^2.1.3"
[tool.poetry.scripts]
falyx = "falyx.__main__:main"

View File

@ -1,6 +1,6 @@
import pytest
from falyx.action import Action, ActionFactoryAction, ChainedAction
from falyx.action import Action, ActionFactory, ChainedAction
def make_chain(value) -> ChainedAction:
@ -16,9 +16,7 @@ def make_chain(value) -> ChainedAction:
@pytest.mark.asyncio
async def test_action_factory_action():
action = ActionFactoryAction(
name="test_action", factory=make_chain, args=("test_value",)
)
action = ActionFactory(name="test_action", factory=make_chain, args=("test_value",))
result = await action()

View File

@ -0,0 +1,26 @@
import pytest
from falyx.exceptions import CommandArgumentError
from falyx.parser import CommandArgumentParser
@pytest.mark.asyncio
async def test_parse_negative_integer():
parser = CommandArgumentParser()
parser.add_argument("--number", type=int, required=True, help="A negative integer")
args = await parser.parse_args(["--number", "-42"])
assert args["number"] == -42
@pytest.mark.asyncio
async def test_parse_negative_float():
parser = CommandArgumentParser()
parser.add_argument("--value", type=float, required=True, help="A negative float")
args = await parser.parse_args(["--value", "-3.14"])
assert args["value"] == -3.14
def test_parse_number_flag():
parser = CommandArgumentParser()
with pytest.raises(CommandArgumentError):
parser.add_argument("-1", type=int, required=True, help="A negative number flag")