Add SelectFileAction, Remove GrepAction, UppercaseIO

This commit is contained in:
Roland Thomas Jr 2025-05-10 01:08:34 -04:00
parent 76e542cfce
commit 9351ae658c
Signed by: roland
GPG Key ID: 7C3C2B085A4C2872
11 changed files with 242 additions and 71 deletions

View File

@ -64,6 +64,7 @@ class BaseAction(ABC):
def __init__(
self,
name: str,
*,
hooks: HookManager | None = None,
inject_last_result: bool = False,
inject_into: str = "last_result",
@ -182,6 +183,7 @@ class Action(BaseAction):
self,
name: str,
action: Callable[..., Any],
*,
rollback: Callable[..., Any] | None = None,
args: tuple[Any, ...] = (),
kwargs: dict[str, Any] | None = None,
@ -191,7 +193,12 @@ class Action(BaseAction):
retry: bool = False,
retry_policy: RetryPolicy | None = None,
) -> None:
super().__init__(name, hooks, inject_last_result, inject_into)
super().__init__(
name,
hooks=hooks,
inject_last_result=inject_last_result,
inject_into=inject_into,
)
self.action = action
self.rollback = rollback
self.args = args
@ -422,13 +429,19 @@ class ChainedAction(BaseAction, ActionListMixin):
self,
name: str,
actions: list[BaseAction | Any] | None = None,
*,
hooks: HookManager | None = None,
inject_last_result: bool = False,
inject_into: str = "last_result",
auto_inject: bool = False,
return_list: bool = False,
) -> None:
super().__init__(name, hooks, inject_last_result, inject_into)
super().__init__(
name,
hooks=hooks,
inject_last_result=inject_last_result,
inject_into=inject_into,
)
ActionListMixin.__init__(self)
self.auto_inject = auto_inject
self.return_list = return_list
@ -608,11 +621,17 @@ class ActionGroup(BaseAction, ActionListMixin):
self,
name: str,
actions: list[BaseAction] | None = None,
*,
hooks: HookManager | None = None,
inject_last_result: bool = False,
inject_into: str = "last_result",
):
super().__init__(name, hooks, inject_last_result, inject_into)
super().__init__(
name,
hooks=hooks,
inject_last_result=inject_last_result,
inject_into=inject_into,
)
ActionListMixin.__init__(self)
if actions:
self.set_actions(actions)
@ -730,7 +749,8 @@ class ProcessAction(BaseAction):
def __init__(
self,
name: str,
func: Callable[..., Any],
action: Callable[..., Any],
*,
args: tuple = (),
kwargs: dict[str, Any] | None = None,
hooks: HookManager | None = None,
@ -738,8 +758,13 @@ class ProcessAction(BaseAction):
inject_last_result: bool = False,
inject_into: str = "last_result",
):
super().__init__(name, hooks, inject_last_result, inject_into)
self.func = func
super().__init__(
name,
hooks=hooks,
inject_last_result=inject_last_result,
inject_into=inject_into,
)
self.action = action
self.args = args
self.kwargs = kwargs or {}
self.executor = executor or ProcessPoolExecutor()
@ -767,7 +792,7 @@ class ProcessAction(BaseAction):
try:
await self.hooks.trigger(HookType.BEFORE, context)
result = await loop.run_in_executor(
self.executor, partial(self.func, *combined_args, **combined_kwargs)
self.executor, partial(self.action, *combined_args, **combined_kwargs)
)
context.result = result
await self.hooks.trigger(HookType.ON_SUCCESS, context)
@ -806,6 +831,6 @@ class ProcessAction(BaseAction):
def __str__(self) -> str:
return (
f"ProcessAction(name={self.name!r}, func={getattr(self.func, '__name__', repr(self.func))}, "
f"ProcessAction(name={self.name!r}, action={getattr(self.action, '__name__', repr(self.action))}, "
f"args={self.args!r}, kwargs={self.kwargs!r})"
)

View File

@ -30,7 +30,7 @@ class BottomBar:
key_validator: Callable[[str], bool] | None = None,
) -> None:
self.columns = columns
self.console = Console()
self.console = Console(color_system="auto")
self._named_items: dict[str, Callable[[], HTML]] = {}
self._value_getters: dict[str, Callable[[], Any]] = CaseInsensitiveDict()
self.toggle_keys: list[str] = []

View File

@ -40,7 +40,7 @@ from falyx.retry_utils import enable_retries_recursively
from falyx.themes.colors import OneColors
from falyx.utils import _noop, confirm_async, ensure_async, logger
console = Console()
console = Console(color_system="auto")
class Command(BaseModel):

View File

@ -15,7 +15,7 @@ from falyx.utils import logger
class ExecutionRegistry:
_store_by_name: Dict[str, List[ExecutionContext]] = defaultdict(list)
_store_all: List[ExecutionContext] = []
_console = Console(color_system="truecolor")
_console = Console(color_system="auto")
@classmethod
def record(cls, context: ExecutionContext):

View File

@ -110,12 +110,12 @@ class Falyx:
register_all_hooks(): Register hooks across all commands and submenus.
debug_hooks(): Log hook registration for debugging.
build_default_table(): Construct the standard Rich table layout.
"""
def __init__(
self,
title: str | Markdown = "Menu",
*,
prompt: str | AnyFormattedText = "> ",
columns: int = 3,
bottom_bar: BottomBar | str | Callable[[], Any] | None = None,
@ -143,7 +143,7 @@ class Falyx:
self.help_command: Command | None = (
self._get_help_command() if include_help_command else None
)
self.console: Console = Console(color_system="truecolor", theme=get_nord_theme())
self.console: Console = Console(color_system="auto", theme=get_nord_theme())
self.welcome_message: str | Markdown | dict[str, Any] = welcome_message
self.exit_message: str | Markdown | dict[str, Any] = exit_message
self.hooks: HookManager = HookManager()
@ -549,7 +549,7 @@ class Falyx:
)
def add_submenu(
self, key: str, description: str, submenu: "Falyx", style: str = OneColors.CYAN
self, key: str, description: str, submenu: "Falyx", *, style: str = OneColors.CYAN
) -> None:
"""Adds a submenu to the menu."""
if not isinstance(submenu, Falyx):
@ -568,6 +568,7 @@ class Falyx:
key: str,
description: str,
action: BaseAction | Callable[[], Any],
*,
args: tuple = (),
kwargs: dict[str, Any] = {},
hidden: bool = False,

View File

@ -20,7 +20,6 @@ import subprocess
import sys
from typing import Any
from rich.console import Console
from rich.tree import Tree
from falyx.action import BaseAction
@ -31,8 +30,6 @@ from falyx.hook_manager import HookManager, HookType
from falyx.themes.colors import OneColors
from falyx.utils import logger
console = Console()
class BaseIOAction(BaseAction):
"""
@ -172,22 +169,7 @@ class BaseIOAction(BaseAction):
if parent:
parent.add("".join(label))
else:
console.print(Tree("".join(label)))
class UppercaseIO(BaseIOAction):
def from_input(self, raw: str | bytes) -> str:
if not isinstance(raw, (str, bytes)):
raise TypeError(
f"{self.name} expected str or bytes input, got {type(raw).__name__}"
)
return raw.strip() if isinstance(raw, str) else raw.decode("utf-8").strip()
async def _run(self, parsed_input: str, *args, **kwargs) -> str:
return parsed_input.upper()
def to_output(self, data: str) -> str:
return data + "\n"
self.console.print(Tree("".join(label)))
class ShellAction(BaseIOAction):
@ -247,41 +229,9 @@ class ShellAction(BaseIOAction):
if parent:
parent.add("".join(label))
else:
console.print(Tree("".join(label)))
self.console.print(Tree("".join(label)))
def __str__(self):
return (
f"ShellAction(name={self.name!r}, command_template={self.command_template!r})"
)
class GrepAction(BaseIOAction):
def __init__(self, name: str, pattern: str, **kwargs):
super().__init__(name=name, **kwargs)
self.pattern = pattern
def from_input(self, raw: str | bytes) -> str:
if not isinstance(raw, (str, bytes)):
raise TypeError(
f"{self.name} expected str or bytes input, got {type(raw).__name__}"
)
return raw.strip() if isinstance(raw, str) else raw.decode("utf-8").strip()
async def _run(self, parsed_input: str) -> str:
command = ["grep", "-n", self.pattern]
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
stdout, stderr = process.communicate(input=parsed_input)
if process.returncode == 1:
return ""
if process.returncode != 0:
raise RuntimeError(stderr.strip())
return stdout.strip()
def to_output(self, result: str) -> str:
return result

View File

@ -45,7 +45,9 @@ class MenuOptionMap(CaseInsensitiveDict):
RESERVED_KEYS = {"Q", "B"}
def __init__(
self, options: dict[str, MenuOption] | None = None, allow_reserved: bool = False
self,
options: dict[str, MenuOption] | None = None,
allow_reserved: bool = False,
):
super().__init__()
self.allow_reserved = allow_reserved

193
falyx/select_file_action.py Normal file
View File

@ -0,0 +1,193 @@
from __future__ import annotations
import csv
import json
import xml.etree.ElementTree as ET
from enum import Enum
from pathlib import Path
from typing import Any
import toml
import yaml
from prompt_toolkit import PromptSession
from rich.console import Console
from rich.tree import Tree
from falyx.action import BaseAction
from falyx.context import ExecutionContext
from falyx.execution_registry import ExecutionRegistry as er
from falyx.hook_manager import HookType
from falyx.selection import (
SelectionOption,
prompt_for_selection,
render_selection_dict_table,
)
from falyx.themes.colors import OneColors
from falyx.utils import logger
class FileReturnType(Enum):
TEXT = "text"
PATH = "path"
JSON = "json"
TOML = "toml"
YAML = "yaml"
CSV = "csv"
XML = "xml"
class SelectFileAction(BaseAction):
"""
SelectFileAction allows users to select a file from a directory and return:
- file content (as text, JSON, CSV, etc.)
- or the file path itself.
Supported formats: text, json, yaml, toml, csv, xml.
Useful for:
- dynamically loading config files
- interacting with user-selected data
- chaining file contents into workflows
Args:
name (str): Name of the action.
directory (Path | str): Where to search for files.
title (str): Title of the selection menu.
columns (int): Number of columns in the selection menu.
prompt_message (str): Message to display when prompting for selection.
style (str): Style for the selection options.
suffix_filter (str | None): Restrict to certain file types.
return_type (FileReturnType): What to return (path, content, parsed).
console (Console | None): Console instance for output.
prompt_session (PromptSession | None): Prompt session for user input.
"""
def __init__(
self,
name: str,
directory: Path | str = ".",
*,
title: str = "Select a file",
columns: int = 3,
prompt_message: str = "Choose > ",
style: str = OneColors.WHITE,
suffix_filter: str | None = None,
return_type: FileReturnType = FileReturnType.PATH,
console: Console | None = None,
prompt_session: PromptSession | None = None,
):
super().__init__(name)
self.directory = Path(directory).resolve()
self.title = title
self.columns = columns
self.prompt_message = prompt_message
self.suffix_filter = suffix_filter
self.style = style
self.return_type = return_type
self.console = console or Console(color_system="auto")
self.prompt_session = prompt_session or PromptSession()
def get_options(self, files: list[Path]) -> dict[str, SelectionOption]:
value: Any
options = {}
for index, file in enumerate(files):
try:
if self.return_type == FileReturnType.TEXT:
value = file.read_text(encoding="UTF-8")
elif self.return_type == FileReturnType.PATH:
value = file
elif self.return_type == FileReturnType.JSON:
value = json.loads(file.read_text(encoding="UTF-8"))
elif self.return_type == FileReturnType.TOML:
value = toml.loads(file.read_text(encoding="UTF-8"))
elif self.return_type == FileReturnType.YAML:
value = yaml.safe_load(file.read_text(encoding="UTF-8"))
elif self.return_type == FileReturnType.CSV:
with open(file, newline="", encoding="UTF-8") as csvfile:
reader = csv.reader(csvfile)
value = list(reader)
elif self.return_type == FileReturnType.XML:
tree = ET.parse(file, parser=ET.XMLParser(encoding="UTF-8"))
root = tree.getroot()
value = ET.tostring(root, encoding="unicode")
else:
raise ValueError(f"Unsupported return type: {self.return_type}")
options[str(index)] = SelectionOption(
description=file.name, value=value, style=self.style
)
except Exception as error:
logger.warning("[ERROR] Failed to parse %s: %s", file.name, error)
return options
async def _run(self, *args, **kwargs) -> Any:
context = ExecutionContext(name=self.name, args=args, kwargs=kwargs, action=self)
context.start_timer()
try:
await self.hooks.trigger(HookType.BEFORE, context)
files = [
f
for f in self.directory.iterdir()
if f.is_file()
and (self.suffix_filter is None or f.suffix == self.suffix_filter)
]
if not files:
raise FileNotFoundError("No files found in directory.")
options = self.get_options(files)
table = render_selection_dict_table(self.title, options, self.columns)
key = await prompt_for_selection(
options.keys(),
table,
console=self.console,
prompt_session=self.prompt_session,
prompt_message=self.prompt_message,
)
result = options[key].value
context.result = result
await self.hooks.trigger(HookType.ON_SUCCESS, context)
return result
except Exception as error:
context.exception = error
await self.hooks.trigger(HookType.ON_ERROR, context)
raise
finally:
context.stop_timer()
await self.hooks.trigger(HookType.AFTER, context)
await self.hooks.trigger(HookType.ON_TEARDOWN, context)
er.record(context)
async def preview(self, parent: Tree | None = None):
label = f"[{OneColors.GREEN}]📁 SelectFilesAction[/] '{self.name}'"
tree = parent.add(label) if parent else Tree(label)
tree.add(f"[dim]Directory:[/] {str(self.directory)}")
tree.add(f"[dim]Suffix filter:[/] {self.suffix_filter or 'None'}")
tree.add(f"[dim]Return type:[/] {self.return_type}")
tree.add(f"[dim]Prompt:[/] {self.prompt_message}")
tree.add(f"[dim]Columns:[/] {self.columns}")
try:
files = list(self.directory.iterdir())
if self.suffix_filter:
files = [f for f in files if f.suffix == self.suffix_filter]
sample = files[:10]
file_list = tree.add("[dim]Files:[/]")
for f in sample:
file_list.add(f"[dim]{f.name}[/]")
if len(files) > 10:
file_list.add(f"[dim]... ({len(files) - 10} more)[/]")
except Exception as error:
tree.add(f"[bold red]⚠️ Error scanning directory: {error}[/]")
if not parent:
self.console.print(tree)
def __str__(self) -> str:
return (
f"SelectFilesAction(name={self.name!r}, dir={str(self.directory)!r}, "
f"suffix_filter={self.suffix_filter!r}, return_type={self.return_type})"
)

View File

@ -1 +1 @@
__version__ = "0.1.20"
__version__ = "0.1.21"

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "falyx"
version = "0.1.20"
version = "0.1.21"
description = "Reliable and introspectable async CLI action framework."
authors = ["Roland Thomas Jr <roland@rtj.dev>"]
license = "MIT"

View File

@ -28,7 +28,7 @@ async def test_process_action_executes_correctly():
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
action = ProcessAction(name="proc", func=slow_add, args=(2, 3))
action = ProcessAction(name="proc", action=slow_add, args=(2, 3))
result = await action()
assert result == 5
@ -41,6 +41,6 @@ async def test_process_action_rejects_unpickleable():
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
action = ProcessAction(name="proc_fail", func=unpickleable, args=(2,))
action = ProcessAction(name="proc_fail", action=unpickleable, args=(2,))
with pytest.raises(pickle.PicklingError, match="Can't pickle"):
await action()