feat(core): clone commands and actions when binding runtimes

Add clone support across Action types and Command so commands can be safely
registered or runner-bound without mutating the original instances.

- clone BaseAction implementations across simple, composite, IO, prompt, file,
  HTTP, process, and signal actions
- bind cloned commands in Falyx.add_command_from_command() and CommandRunner
- preserve local never_prompt settings when cloning actions
- rename shared runtime state from options to options_manager for consistency
- seed root and execution option namespaces consistently
- apply scoped root and namespace option overrides during routing and dispatch
- improve namespace completion by delegating option suggestions to FalyxParser
- enrich missing-value errors and error hints
This commit is contained in:
2026-06-07 13:04:35 -04:00
parent 8db7a9e6dc
commit efe3f5fd99
78 changed files with 9513 additions and 433 deletions

View File

@@ -0,0 +1,334 @@
import pytest
from falyx.action.action import Action
from falyx.action.action_group import ActionGroup
from falyx.action.chained_action import ChainedAction
from falyx.action.http_action import HTTPAction
from falyx.action.menu_action import MenuAction
from falyx.action.process_action import ProcessAction
from falyx.hook_manager import HookType
from falyx.menu import MenuOption, MenuOptionMap
from falyx.retry import RetryHandler, RetryPolicy
def _retry_hooks(action) -> list:
return [
hook
for hook in action.hooks._hooks[HookType.ON_ERROR]
if isinstance(getattr(hook, "__self__", None), RetryHandler)
]
def _non_retry_error_hooks(action) -> list:
return [
hook
for hook in action.hooks._hooks[HookType.ON_ERROR]
if not isinstance(getattr(hook, "__self__", None), RetryHandler)
]
def _before_hooks(action) -> list:
return list(action.hooks._hooks[HookType.BEFORE])
def test_action_group_clone_recursively_isolates_nested_action_graph():
nested_chain = ChainedAction(
name="nested-chain",
actions=[
Action("step-two", lambda: "two"),
Action("step-three", lambda: "three"),
],
)
original = ActionGroup(
name="group",
actions=[
Action("step-one", lambda: "one"),
nested_chain,
],
)
cloned = original.clone()
assert cloned is not original
assert cloned.actions is not original.actions
assert len(cloned.actions) == len(original.actions)
# Top-level children are cloned.
assert cloned.actions[0] is not original.actions[0]
assert cloned.actions[1] is not original.actions[1]
# Nested action graph is also cloned.
assert isinstance(cloned.actions[1], ChainedAction)
assert cloned.actions[1].actions is not original.actions[1].actions
for cloned_child, original_child in zip(
cloned.actions[1].actions,
original.actions[1].actions,
strict=True,
):
assert cloned_child is not original_child
assert cloned_child.name == original_child.name
# Mutating the clone does not mutate the original.
cloned.actions.append(Action("step-four", lambda: "four"))
assert len(cloned.actions) == 3
assert len(original.actions) == 2
cloned.actions[1].actions.append(Action("step-five", lambda: "five"))
assert len(cloned.actions[1].actions) == 3
assert len(original.actions[1].actions) == 2
def test_menu_action_clone_copies_menu_option_map_and_clones_contained_actions():
menu_options = MenuOptionMap(disable_reserved=True)
menu_options["A"] = MenuOption(
description="Alpha",
action=Action("alpha-action", lambda: "alpha"),
)
original = MenuAction(
name="main-menu",
menu_options=menu_options,
title="Main Menu",
)
cloned = original.clone()
assert cloned is not original
assert cloned.menu_options is not original.menu_options
assert cloned.menu_options["A"] is not original.menu_options["A"]
assert cloned.menu_options["A"].description == original.menu_options["A"].description
# Contained action should also be cloned.
assert cloned.menu_options["A"].action is not original.menu_options["A"].action
assert cloned.menu_options["A"].action.name == original.menu_options["A"].action.name
# Mutating the clone should not affect the original.
cloned.menu_options["A"].description = "Changed"
assert original.menu_options["A"].description == "Alpha"
cloned.menu_options["B"] = MenuOption(
description="Beta",
action=Action("beta-action", lambda: "beta"),
)
assert "B" in cloned.menu_options
assert "B" not in original.menu_options
def test_process_action_clone_does_not_reuse_runtime_only_executor_state():
original = ProcessAction(
name="proc",
action=lambda x: x + 1,
args=(1,),
kwargs={"y": 2},
)
original.executor = object()
cloned = original.clone()
assert cloned is not original
assert cloned.hooks is not original.hooks
assert cloned.args == original.args
assert cloned.kwargs == original.kwargs
assert cloned.executor is not original.executor
def test_http_action_clone_preserves_retry_policy_without_duplicating_spinner_hooks():
retry_policy = RetryPolicy(max_retries=3, delay=1.5, backoff=2.0)
retry_policy.enable_policy()
original = HTTPAction(
name="get-users",
method="GET",
url="https://example.com/api/users",
headers={"Authorization": "Bearer token"},
params={"page": 1},
retry_policy=retry_policy,
spinner=True,
)
before_count = len(original.hooks._hooks[HookType.BEFORE])
teardown_count = len(original.hooks._hooks[HookType.ON_TEARDOWN])
error_count = len(original.hooks._hooks[HookType.ON_ERROR])
cloned = original.clone()
assert cloned is not original
assert cloned.hooks is not original.hooks
assert cloned.retry_policy is not original.retry_policy
assert cloned.retry_policy.enabled is original.retry_policy.enabled
assert cloned.retry_policy.max_retries == original.retry_policy.max_retries
assert cloned.retry_policy.delay == original.retry_policy.delay
assert cloned.retry_policy.backoff == original.retry_policy.backoff
assert len(cloned.hooks._hooks[HookType.BEFORE]) == before_count
assert len(cloned.hooks._hooks[HookType.ON_TEARDOWN]) == teardown_count
assert len(cloned.hooks._hooks[HookType.ON_ERROR]) == error_count
@pytest.mark.asyncio
async def test_action_clone_registers_exactly_one_retry_hook():
async def flaky():
return "ok"
policy = RetryPolicy(max_retries=2, delay=0.1, backoff=2.0)
policy.enable_policy()
original = Action(
"flaky",
flaky,
retry_policy=policy,
)
cloned = original.clone()
original_retry_hooks = _retry_hooks(original)
cloned_retry_hooks = _retry_hooks(cloned)
assert len(original_retry_hooks) == 1
assert len(cloned_retry_hooks) == 1
assert cloned_retry_hooks[0] is not original_retry_hooks[0]
assert getattr(cloned_retry_hooks[0], "__self__", None) is not getattr(
original_retry_hooks[0], "__self__", None
)
def test_action_clone_preserves_non_retry_hooks_without_duplication():
calls = []
async def custom_error_hook(context):
calls.append(context.name)
original = Action("demo", lambda: "ok")
original.hooks.register(HookType.BEFORE, lambda context: None)
original.hooks.register(HookType.ON_ERROR, custom_error_hook)
cloned = original.clone()
assert len(_before_hooks(cloned)) == len(_before_hooks(original))
assert len(_non_retry_error_hooks(cloned)) == len(_non_retry_error_hooks(original))
assert cloned.hooks is not original.hooks
def test_action_clone_copies_retry_policy_without_sharing_it():
policy = RetryPolicy(max_retries=2, delay=0.25, backoff=3.0)
policy.enable_policy()
original = Action(
"demo",
lambda: "ok",
retry_policy=policy,
)
cloned = original.clone()
assert cloned.retry_policy is not original.retry_policy
assert cloned.retry_policy.enabled is original.retry_policy.enabled
assert cloned.retry_policy.max_retries == original.retry_policy.max_retries
assert cloned.retry_policy.delay == original.retry_policy.delay
assert cloned.retry_policy.backoff == original.retry_policy.backoff
cloned.retry_policy.max_retries = 9
assert original.retry_policy.max_retries == 2
@pytest.mark.asyncio
async def test_action_clone_retry_behavior_still_works_independently():
state = {"original": 0, "clone": 0}
async def flaky_original():
if state["original"] == 0:
state["original"] += 1
raise RuntimeError("boom")
return "original-ok"
async def flaky_clone():
if state["clone"] == 0:
state["clone"] += 1
raise RuntimeError("boom")
return "clone-ok"
policy = RetryPolicy(max_retries=1, delay=0.0, backoff=1.0)
policy.enable_policy()
original = Action("orig", flaky_original, retry_policy=policy)
cloned = original.clone()
cloned.action = flaky_clone
original_result = await original()
cloned_result = await cloned()
assert original_result == "original-ok"
assert cloned_result == "clone-ok"
assert state["original"] == 1
assert state["clone"] == 1
def test_http_action_clone_registers_exactly_one_retry_hook():
policy = RetryPolicy(max_retries=2, delay=0.1, backoff=2.0)
policy.enable_policy()
original = HTTPAction(
name="get-users",
method="GET",
url="https://example.com/api/users",
retry_policy=policy,
spinner=True,
)
cloned = original.clone()
original_retry_hooks = _retry_hooks(original)
cloned_retry_hooks = _retry_hooks(cloned)
assert len(original_retry_hooks) == 1
assert len(cloned_retry_hooks) == 1
assert cloned_retry_hooks[0] is not original_retry_hooks[0]
def test_http_action_clone_copies_retry_policy_without_sharing_it():
policy = RetryPolicy(max_retries=3, delay=1.5, backoff=2.0)
policy.enable_policy()
original = HTTPAction(
name="get-users",
method="GET",
url="https://example.com/api/users",
retry_policy=policy,
)
cloned = original.clone()
assert cloned.retry_policy is not original.retry_policy
assert cloned.retry_policy.enabled is original.retry_policy.enabled
assert cloned.retry_policy.max_retries == original.retry_policy.max_retries
assert cloned.retry_policy.delay == original.retry_policy.delay
assert cloned.retry_policy.backoff == original.retry_policy.backoff
def test_http_action_clone_does_not_duplicate_spinner_hooks():
policy = RetryPolicy(max_retries=1, delay=0.0, backoff=1.0)
policy.enable_policy()
original = HTTPAction(
name="get-users",
method="GET",
url="https://example.com/api/users",
retry_policy=policy,
spinner=True,
)
cloned = original.clone()
assert len(cloned.hooks._hooks[HookType.BEFORE]) == len(
original.hooks._hooks[HookType.BEFORE]
)
assert len(cloned.hooks._hooks[HookType.ON_TEARDOWN]) == len(
original.hooks._hooks[HookType.ON_TEARDOWN]
)

View File

@@ -0,0 +1,430 @@
from __future__ import annotations
import csv
import json
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Any
import pytest
import toml
import yaml
from rich.tree import Tree
from falyx.action.action_types import FileType
from falyx.action.save_file_action import SaveFileAction
from falyx.hook_manager import HookType
class CaptureConsole:
def __init__(self) -> None:
self.printed: list[tuple[tuple[Any, ...], dict[str, Any]]] = []
def print(self, *args: Any, **kwargs: Any) -> None:
self.printed.append((args, kwargs))
def make_action(file_path: Path | str | None, **overrides: Any) -> SaveFileAction:
defaults: dict[str, Any] = {
"name": "SaveOutput",
"file_path": file_path,
}
defaults.update(overrides)
return SaveFileAction(**defaults)
def register_lifecycle_hooks(action: SaveFileAction) -> list[tuple[HookType, Any]]:
calls: list[tuple[HookType, Any]] = []
def make_hook(hook_type: HookType):
def hook(context: Any) -> None:
calls.append((hook_type, context))
return hook
for hook_type in HookType:
action.hooks.register(hook_type, make_hook(hook_type))
return calls
def hook_types(calls: list[tuple[HookType, Any]]) -> list[HookType]:
return [hook_type for hook_type, _ in calls]
def test_init_normalizes_configuration_and_string_file_type(tmp_path: Path) -> None:
target = tmp_path / "output.json"
action = SaveFileAction(
name="SaveJson",
file_path=str(target),
file_type="json",
mode="a",
encoding="utf-8",
data={"name": "falyx"},
overwrite=False,
create_dirs=False,
inject_last_result=True,
inject_into="payload",
never_prompt=True,
)
assert action.name == "SaveJson"
assert action.file_path == target
assert action.file_type == FileType.JSON
assert action.mode == "a"
assert action.encoding == "utf-8"
assert action.data == {"name": "falyx"}
assert action.overwrite is False
assert action.create_dirs is False
assert action.inject_last_result is True
assert action.inject_into == "payload"
assert action.local_never_prompt is True
assert "SaveFileAction" in str(action)
assert "output.json" in str(action)
def test_file_path_property_coerces_string_path_and_none(tmp_path: Path) -> None:
action = make_action(None)
assert action.file_path is None
target = tmp_path / "later.txt"
action.file_path = str(target)
assert action.file_path == target
action.file_path = target
assert action.file_path == target
def test_file_path_rejects_unsupported_values(tmp_path: Path) -> None:
action = make_action(tmp_path / "out.txt")
with pytest.raises(TypeError, match="file_path must be a string or Path object"):
action.file_path = 123 # type: ignore[assignment]
def test_get_infer_target_disables_signature_inference(tmp_path: Path) -> None:
action = make_action(tmp_path / "out.txt")
assert action.get_infer_target() == (None, None)
def test_dict_to_xml_serializes_nested_dicts_lists_and_scalars(tmp_path: Path) -> None:
action = make_action(tmp_path / "out.xml", file_type=FileType.XML)
root = ET.Element("root")
action._dict_to_xml(
{
"name": "falyx",
"metadata": {"version": "0.2.0"},
"tags": ["cli", "framework"],
"commands": [{"name": "run"}, {"name": "help"}],
},
root,
)
assert root.findtext("name") == "falyx"
assert root.find("metadata") is not None
assert root.find("metadata/version") is not None
assert root.findtext("metadata/version") == "0.2.0"
assert [element.text for element in root.findall("tags")] == ["cli", "framework"]
assert [element.findtext("name") for element in root.findall("commands")] == [
"run",
"help",
]
@pytest.mark.asyncio
async def test_save_file_requires_file_path_before_saving() -> None:
action = make_action(None, data="hello")
with pytest.raises(ValueError, match="file_path must be set"):
await action.save_file("hello")
@pytest.mark.asyncio
async def test_save_file_refuses_to_overwrite_existing_file_when_disabled(
tmp_path: Path,
) -> None:
target = tmp_path / "existing.txt"
target.write_text("original", encoding="UTF-8")
action = make_action(target, overwrite=False)
with pytest.raises(FileExistsError, match="File already exists"):
await action.save_file("replacement")
assert target.read_text(encoding="UTF-8") == "original"
@pytest.mark.asyncio
async def test_save_file_requires_parent_directory_when_create_dirs_is_disabled(
tmp_path: Path,
) -> None:
target = tmp_path / "missing" / "out.txt"
action = make_action(target, create_dirs=False)
with pytest.raises(FileNotFoundError, match="Directory does not exist"):
await action.save_file("hello")
@pytest.mark.asyncio
async def test_save_file_creates_missing_parent_directories(tmp_path: Path) -> None:
target = tmp_path / "nested" / "out.txt"
action = make_action(target, file_type=FileType.TEXT, create_dirs=True)
await action.save_file("hello")
assert target.read_text(encoding="UTF-8") == "hello"
@pytest.mark.asyncio
@pytest.mark.parametrize(
("file_type", "filename", "data"),
[
(FileType.TEXT, "note.txt", "hello"),
(FileType.JSON, "data.json", {"name": "falyx", "count": 2}),
(FileType.YAML, "data.yaml", {"name": "falyx", "enabled": True}),
(FileType.TOML, "data.toml", {"name": "falyx", "count": 2}),
(FileType.CSV, "rows.csv", [["name", "count"], ["falyx", "2"]]),
(FileType.TSV, "rows.tsv", [["name", "count"], ["falyx", "2"]]),
(
FileType.XML,
"data.xml",
{
"name": "falyx",
"metadata": {"version": "0.2.0"},
"tags": ["cli", "framework"],
},
),
],
)
async def test_save_file_writes_supported_file_types(
tmp_path: Path,
file_type: FileType,
filename: str,
data: Any,
) -> None:
target = tmp_path / filename
action = make_action(target, file_type=file_type)
await action.save_file(data)
if file_type == FileType.TEXT:
assert target.read_text(encoding="UTF-8") == data
elif file_type == FileType.JSON:
assert json.loads(target.read_text(encoding="UTF-8")) == data
elif file_type == FileType.YAML:
assert yaml.safe_load(target.read_text(encoding="UTF-8")) == data
elif file_type == FileType.TOML:
assert toml.loads(target.read_text(encoding="UTF-8")) == data
elif file_type == FileType.CSV:
with target.open(newline="", encoding="UTF-8") as file:
assert list(csv.reader(file)) == data
elif file_type == FileType.TSV:
with target.open(newline="", encoding="UTF-8") as file:
assert list(csv.reader(file, delimiter="\t")) == data
elif file_type == FileType.XML:
root = ET.parse(target).getroot()
assert root.tag == "root"
assert root.findtext("name") == "falyx"
assert root.findtext("metadata/version") == "0.2.0"
assert [element.text for element in root.findall("tags")] == [
"cli",
"framework",
]
@pytest.mark.asyncio
@pytest.mark.parametrize("file_type", [FileType.CSV, FileType.TSV])
@pytest.mark.parametrize(
"data",
[
{"name": "falyx"},
["name", "count"],
[["name", "count"], "not-a-row"],
],
)
async def test_save_file_requires_list_of_lists_for_delimited_formats(
tmp_path: Path,
file_type: FileType,
data: Any,
) -> None:
target = tmp_path / "rows.data"
action = make_action(target, file_type=file_type)
with pytest.raises(ValueError, match="requires a list of lists"):
await action.save_file(data)
@pytest.mark.asyncio
async def test_save_file_requires_dict_for_xml(tmp_path: Path) -> None:
target = tmp_path / "data.xml"
action = make_action(target, file_type=FileType.XML)
with pytest.raises(
ValueError, match="XML file type requires data to be a dictionary"
):
await action.save_file(["not", "a", "dict"])
@pytest.mark.asyncio
async def test_save_file_raises_for_unsupported_internal_file_type(
tmp_path: Path,
) -> None:
target = tmp_path / "data.out"
action = make_action(target, file_type=FileType.TEXT)
action._file_type = object() # Force the defensive unsupported-type branch.
with pytest.raises(ValueError, match="Unsupported file type"):
await action.save_file("hello")
@pytest.mark.asyncio
async def test_save_file_reraises_write_errors(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
target = tmp_path / "out.txt"
action = make_action(target, file_type=FileType.TEXT)
def fake_write_text(self: Path, data: str, *, encoding: str | None = None) -> int:
raise OSError("disk is unavailable")
monkeypatch.setattr(Path, "write_text", fake_write_text)
with pytest.raises(OSError, match="disk is unavailable"):
await action.save_file("hello")
@pytest.mark.asyncio
async def test_run_saves_configured_data_and_triggers_success_lifecycle(
tmp_path: Path,
) -> None:
target = tmp_path / "out.txt"
action = make_action(target, file_type=FileType.TEXT, data="hello")
calls = register_lifecycle_hooks(action)
result = await action("positional", ignored="kwarg")
assert result == str(target)
assert target.read_text(encoding="UTF-8") == "hello"
assert hook_types(calls) == [
HookType.BEFORE,
HookType.ON_SUCCESS,
HookType.AFTER,
HookType.ON_TEARDOWN,
]
assert calls[0][1].args == ("positional",)
assert calls[0][1].kwargs == {"ignored": "kwarg"}
assert calls[0][1].action is action
@pytest.mark.asyncio
async def test_run_uses_data_from_kwargs_when_no_static_data_is_configured(
tmp_path: Path,
) -> None:
target = tmp_path / "out.txt"
action = make_action(target, file_type=FileType.TEXT, data=None)
result = await action(data="from kwargs")
assert result == str(target)
assert target.read_text(encoding="UTF-8") == "from kwargs"
@pytest.mark.asyncio
async def test_run_triggers_error_lifecycle_and_reraises(tmp_path: Path) -> None:
action = make_action(None, data="hello")
calls = register_lifecycle_hooks(action)
with pytest.raises(ValueError, match="file_path must be set"):
await action()
assert hook_types(calls) == [
HookType.BEFORE,
HookType.ON_ERROR,
HookType.AFTER,
HookType.ON_TEARDOWN,
]
assert isinstance(calls[1][1].exception, ValueError)
@pytest.mark.asyncio
async def test_preview_prints_tree_for_existing_file_when_overwrite_enabled(
tmp_path: Path,
) -> None:
target = tmp_path / "out.txt"
target.write_text("existing", encoding="UTF-8")
action = make_action(target, file_type=FileType.TEXT, overwrite=True)
action.console = CaptureConsole()
await action.preview()
assert len(action.console.printed) == 1
printed_tree = action.console.printed[0][0][0]
assert isinstance(printed_tree, Tree)
@pytest.mark.asyncio
async def test_preview_prints_tree_for_existing_file_when_overwrite_disabled(
tmp_path: Path,
) -> None:
target = tmp_path / "out.txt"
target.write_text("existing", encoding="UTF-8")
action = make_action(target, file_type=FileType.TEXT, overwrite=False)
action.console = CaptureConsole()
await action.preview()
assert len(action.console.printed) == 1
printed_tree = action.console.printed[0][0][0]
assert isinstance(printed_tree, Tree)
@pytest.mark.asyncio
async def test_preview_adds_to_existing_parent_without_printing(tmp_path: Path) -> None:
target = tmp_path / "out.txt"
action = make_action(target, file_type=FileType.JSON)
action.console = CaptureConsole()
parent = Tree("root")
await action.preview(parent=parent)
assert action.console.printed == []
assert len(parent.children) == 1
def test_clone_preserves_configuration_but_returns_distinct_action(
tmp_path: Path,
) -> None:
target = tmp_path / "out.json"
action = make_action(
target,
file_type=FileType.JSON,
mode="a",
encoding="utf-8",
data={"name": "falyx"},
overwrite=False,
create_dirs=False,
inject_last_result=True,
inject_into="payload",
never_prompt=True,
)
clone = action.clone()
assert clone is not action
assert clone.name == action.name
assert clone.file_path == action.file_path
assert clone.file_type == action.file_type
assert clone.mode == action.mode
assert clone.encoding == action.encoding
assert clone.data == action.data
assert clone.overwrite is action.overwrite
assert clone.create_dirs is action.create_dirs
assert clone.inject_last_result is action.inject_last_result
assert clone.inject_into == action.inject_into
assert clone.local_never_prompt is True

View File

@@ -1,7 +1,83 @@
import pytest
from __future__ import annotations
from falyx.action import SelectionAction
from falyx.selection import SelectionOption
from typing import Any
import pytest
from rich.tree import Tree
import falyx.action.selection_action as selection_action_module
from falyx.action.action_types import SelectionReturnType
from falyx.action.selection_action import SelectionAction
from falyx.hook_manager import HookType
from falyx.selection import SelectionOption, SelectionOptionMap
from falyx.signals import CancelSignal
class DummyPromptSession:
pass
class CaptureConsole:
def __init__(self) -> None:
self.printed: list[tuple[tuple[Any, ...], dict[str, Any]]] = []
def print(self, *args: Any, **kwargs: Any) -> None:
self.printed.append((args, kwargs))
class FakeSharedContext:
def __init__(self, value: Any) -> None:
self.value = value
def last_result(self) -> Any:
return self.value
class SizedButUnsupportedSelections:
def __len__(self) -> int:
return 0
def make_action(selections: Any | None = None, **overrides: Any) -> SelectionAction:
defaults: dict[str, Any] = {
"name": "ChooseThing",
"selections": (
selections if selections is not None else ["alpha", "beta", "gamma"]
),
"prompt_session": DummyPromptSession(),
}
defaults.update(overrides)
return SelectionAction(**defaults)
def make_option_map_action(**overrides: Any) -> SelectionAction:
return make_action(
{
"0": SelectionOption("Development", "dev"),
"1": SelectionOption("Production", "prod"),
"2": SelectionOption("Staging", "stage"),
},
**overrides,
)
def register_lifecycle_hooks(action: SelectionAction) -> list[tuple[HookType, Any]]:
calls: list[tuple[HookType, Any]] = []
def make_hook(hook_type: HookType):
def hook(context: Any) -> None:
calls.append((hook_type, context))
return hook
for hook_type in HookType:
action.hooks.register(hook_type, make_hook(hook_type))
return calls
def hook_types(calls: list[tuple[HookType, Any]]) -> list[HookType]:
return [hook_type for hook_type, _ in calls]
@pytest.mark.asyncio
@@ -285,3 +361,586 @@ async def test_selection_prompt_map_never_prompt_by_value_wildcard():
result = await action()
assert result == ["Beta Service", "Alpha Service"]
def test_init_normalizes_list_tuple_set_and_basic_configuration() -> None:
session = DummyPromptSession()
tuple_action = SelectionAction(
name="TupleChoice",
selections=("red", "blue"),
title="Colors",
columns=2,
prompt_message="[bold]Pick >[/] ",
default_selection="1",
number_selections=1,
separator=";",
allow_duplicates=True,
return_type="value",
prompt_session=session,
never_prompt=True,
show_table=False,
)
assert tuple_action.selections == ["red", "blue"]
assert tuple_action.return_type is SelectionReturnType.VALUE
assert tuple_action.title == "Colors"
assert tuple_action.columns == 2
assert tuple_action.default_selection == "1"
assert tuple_action.separator == ";"
assert tuple_action.allow_duplicates is True
assert tuple_action.prompt_session is session
assert tuple_action.local_never_prompt is True
assert tuple_action.show_table is False
set_action = make_action({"red", "blue"})
assert sorted(set_action.selections) == ["blue", "red"]
def test_init_converts_plain_dict_to_selection_option_map() -> None:
action = make_action({"dev": "Development", "prod": "Production"})
assert isinstance(action.selections, SelectionOptionMap)
assert list(action.selections) == ["0", "1"]
assert action.selections["0"] == SelectionOption("dev", "Development")
assert action.selections["1"] == SelectionOption("prod", "Production")
def test_init_preserves_selection_option_map_values() -> None:
action = make_action(
{
"D": SelectionOption("Development", "dev", style="green"),
"P": SelectionOption("Production", "prod", style="red"),
}
)
assert isinstance(action.selections, SelectionOptionMap)
assert action.selections["D"].description == "Development"
assert action.selections["P"].value == "prod"
@pytest.mark.parametrize("number_selections", [1, 2, "*"])
def test_number_selections_accepts_positive_ints_and_star(
number_selections: int | str,
) -> None:
action = make_action(number_selections=number_selections)
assert action.number_selections == number_selections
@pytest.mark.parametrize("number_selections", [0, -1, "many", object()])
def test_number_selections_rejects_invalid_values(number_selections: Any) -> None:
action = make_action()
with pytest.raises(ValueError, match="number_selections"):
action.number_selections = number_selections
@pytest.mark.parametrize(
("selections", "error_type", "match"),
[
({1: SelectionOption("One", 1)}, ValueError, "Invalid dictionary format"),
(123, TypeError, "selections"),
],
)
def test_selections_setter_rejects_invalid_inputs(
selections: Any,
error_type: type[BaseException],
match: str,
) -> None:
with pytest.raises(error_type, match=match):
make_action(selections)
def test_find_cancel_key_returns_numeric_gap_for_dict_and_next_index_for_list() -> None:
dict_action = make_action(
{
"0": SelectionOption("Zero", 0),
"2": SelectionOption("Two", 2),
}
)
list_action = make_action(["zero", "one"])
assert dict_action._find_cancel_key() == "1"
assert list_action._find_cancel_key() == "2"
def test_cancel_key_setter_rejects_non_string_values() -> None:
action = make_action()
with pytest.raises(TypeError, match="Cancel key must be a string"):
action.cancel_key = 1 # type: ignore[assignment]
def test_cancel_key_setter_rejects_existing_dict_key() -> None:
action = make_action({"A": SelectionOption("Alpha", "alpha")})
with pytest.raises(
ValueError, match="Cancel key cannot be one of the selection keys"
):
action.cancel_key = "A"
@pytest.mark.parametrize("cancel_key", ["x", "3"])
def test_cancel_key_setter_rejects_invalid_list_cancel_key(cancel_key: str) -> None:
action = make_action(["alpha", "beta"])
with pytest.raises(ValueError, match="cancel_key must be a digit"):
action.cancel_key = cancel_key
def test_cancel_formatter_marks_cancel_key_and_formats_regular_items() -> None:
action = make_action(["alpha", "beta"])
action.cancel_key = "2"
assert "Cancel" in action.cancel_formatter(2, "Cancel")
assert action.cancel_formatter(1, "beta").endswith("beta")
def test_get_infer_target_disables_signature_inference() -> None:
action = make_action()
assert action.get_infer_target() == (None, None)
@pytest.mark.parametrize(
("return_type", "keys", "expected"),
[
(SelectionReturnType.KEY, "0", "0"),
(SelectionReturnType.KEY, ["0", "2"], ["0", "2"]),
(SelectionReturnType.VALUE, "1", "prod"),
(SelectionReturnType.VALUE, ["0", "2"], ["dev", "stage"]),
(SelectionReturnType.DESCRIPTION, "0", "Development"),
(
SelectionReturnType.DESCRIPTION,
["0", "2"],
["Development", "Staging"],
),
(
SelectionReturnType.DESCRIPTION_VALUE,
"1",
{"Production": "prod"},
),
(
SelectionReturnType.DESCRIPTION_VALUE,
["0", "2"],
{"Development": "dev", "Staging": "stage"},
),
],
)
def test_get_result_from_keys_returns_configured_shape(
return_type: SelectionReturnType,
keys: str | list[str],
expected: Any,
) -> None:
action = make_option_map_action(return_type=return_type)
assert action._get_result_from_keys(keys) == expected
@pytest.mark.parametrize("keys", ["0", ["0", "1"]])
def test_get_result_from_keys_returns_items_mapping(keys: str | list[str]) -> None:
action = make_option_map_action(return_type=SelectionReturnType.ITEMS)
result = action._get_result_from_keys(keys)
assert isinstance(result, dict)
assert set(result) == ({keys} if isinstance(keys, str) else set(keys))
assert all(isinstance(option, SelectionOption) for option in result.values())
def test_get_result_from_keys_requires_dict_selections() -> None:
action = make_action(["alpha", "beta"])
with pytest.raises(TypeError, match="Selections must be a dictionary"):
action._get_result_from_keys("0")
def test_get_result_from_keys_rejects_unsupported_return_type() -> None:
action = make_option_map_action()
action.return_type = object() # Force defensive branch unreachable through __init__.
with pytest.raises(ValueError, match="Unsupported return type"):
action._get_result_from_keys("0")
@pytest.mark.asyncio
@pytest.mark.parametrize(
("maybe_result", "expected"),
[
("1", "1"),
("prod", "1"),
("Production", "1"),
],
)
async def test_resolve_single_default_maps_dict_key_value_and_description(
maybe_result: str,
expected: str,
) -> None:
action = make_option_map_action()
assert await action._resolve_single_default(maybe_result) == expected
@pytest.mark.asyncio
@pytest.mark.parametrize(
("maybe_result", "expected"),
[
("1", "1"),
("beta", "1"),
("missing", ""),
],
)
async def test_resolve_single_default_maps_list_index_or_value(
maybe_result: str,
expected: str,
) -> None:
action = make_action(["alpha", "beta"])
assert await action._resolve_single_default(maybe_result) == expected
@pytest.mark.asyncio
async def test_resolve_effective_default_uses_first_value_for_single_selection_defaults() -> (
None
):
action = make_action(["alpha", "beta"], default_selection=["beta"])
assert await action._resolve_effective_default() == "1"
@pytest.mark.asyncio
async def test_resolve_effective_default_uses_first_last_result_for_single_selection() -> (
None
):
action = make_action(["alpha", "beta"])
action.shared_context = FakeSharedContext(["beta"])
assert await action._resolve_effective_default() == "1"
@pytest.mark.asyncio
async def test_resolve_effective_default_joins_multi_selection_defaults() -> None:
action = make_action(
["alpha", "beta", "gamma"],
default_selection=["alpha", "gamma"],
number_selections=2,
)
assert await action._resolve_effective_default() == "0,2"
@pytest.mark.asyncio
async def test_resolve_effective_default_joins_multi_selection_last_result() -> None:
action = make_action(["alpha", "beta", "gamma"], number_selections=2)
action.shared_context = FakeSharedContext(["alpha", "gamma"])
assert await action._resolve_effective_default() == "0,2"
@pytest.mark.asyncio
async def test_resolve_effective_default_allows_unbounded_multi_selection_last_result() -> (
None
):
action = make_action(["alpha", "beta", "gamma"], number_selections="*")
action.shared_context = FakeSharedContext(["alpha", "beta", "gamma"])
assert await action._resolve_effective_default() == "0,1,2"
@pytest.mark.asyncio
async def test_resolve_effective_default_rejects_default_length_mismatch() -> None:
action = make_action(
["alpha", "beta", "gamma"],
default_selection=["alpha"],
number_selections=2,
)
with pytest.raises(ValueError, match="default_selection has a different length"):
await action._resolve_effective_default()
@pytest.mark.asyncio
async def test_resolve_effective_default_rejects_last_result_length_mismatch() -> None:
action = make_action(["alpha", "beta", "gamma"], number_selections=2)
action.shared_context = FakeSharedContext(["alpha"])
with pytest.raises(ValueError, match="last_result has a different length"):
await action._resolve_effective_default()
@pytest.mark.asyncio
async def test_resolve_effective_default_warns_when_injected_result_is_unusable(
caplog: pytest.LogCaptureFixture,
) -> None:
action = make_action(
["alpha", "beta"],
inject_last_result=True,
number_selections=2,
)
action.shared_context = FakeSharedContext("missing")
assert await action._resolve_effective_default() == ""
assert "Injected last result" in caplog.text
@pytest.mark.asyncio
async def test_run_list_headless_single_selection_uses_default() -> None:
action = make_action(["alpha", "beta"], never_prompt=True, default_selection="1")
result = await action()
assert result == "beta"
@pytest.mark.asyncio
async def test_run_list_headless_multi_selection_uses_default_list() -> None:
action = make_action(
["alpha", "beta", "gamma"],
never_prompt=True,
default_selection=["alpha", "gamma"],
number_selections=2,
)
result = await action()
assert result == ["alpha", "gamma"]
@pytest.mark.asyncio
async def test_run_dict_headless_single_selection_returns_value() -> None:
action = make_option_map_action(never_prompt=True, default_selection="1")
result = await action()
assert result == "prod"
@pytest.mark.asyncio
async def test_run_dict_headless_multi_selection_returns_configured_shape() -> None:
action = make_option_map_action(
never_prompt=True,
default_selection=["0", "2"],
number_selections=2,
return_type=SelectionReturnType.DESCRIPTION_VALUE,
)
result = await action()
assert result == {"Development": "dev", "Staging": "stage"}
@pytest.mark.asyncio
async def test_run_list_interactive_uses_prompt_for_index(
monkeypatch: pytest.MonkeyPatch,
) -> None:
action = make_action(["alpha", "beta"], never_prompt=False, show_table=False)
async def fake_prompt_for_index(*args: Any, **kwargs: Any) -> int:
assert kwargs["prompt_session"] is action.prompt_session
assert kwargs["show_table"] is False
assert kwargs["cancel_key"] == "2"
return 1
monkeypatch.setattr(
selection_action_module, "prompt_for_index", fake_prompt_for_index
)
result = await action()
assert result == "beta"
@pytest.mark.asyncio
async def test_run_dict_interactive_uses_prompt_for_selection(
monkeypatch: pytest.MonkeyPatch,
) -> None:
action = make_option_map_action(never_prompt=False, show_table=False)
async def fake_prompt_for_selection(*args: Any, **kwargs: Any) -> str:
assert kwargs["prompt_session"] is action.prompt_session
assert kwargs["show_table"] is False
assert kwargs["cancel_key"] == "3"
return "2"
monkeypatch.setattr(
selection_action_module,
"prompt_for_selection",
fake_prompt_for_selection,
)
result = await action()
assert result == "stage"
@pytest.mark.asyncio
async def test_run_raises_when_never_prompt_has_no_effective_default() -> None:
action = make_action(["alpha", "beta"], never_prompt=True)
with pytest.raises(ValueError, match="never_prompt"):
await action()
@pytest.mark.asyncio
async def test_run_list_cancel_triggers_error_and_teardown_hooks(
monkeypatch: pytest.MonkeyPatch,
) -> None:
action = make_action(["alpha", "beta"], never_prompt=True, default_selection="0")
calls = register_lifecycle_hooks(action)
async def fake_resolve_effective_default() -> str:
return "4"
monkeypatch.setattr(
action, "_resolve_effective_default", fake_resolve_effective_default
)
with pytest.raises(IndexError):
await action()
assert HookType.BEFORE in hook_types(calls)
assert HookType.ON_ERROR in hook_types(calls)
assert HookType.AFTER in hook_types(calls)
assert HookType.ON_TEARDOWN in hook_types(calls)
error_contexts = [
context for hook_type, context in calls if hook_type is HookType.ON_ERROR
]
assert isinstance(error_contexts[0].exception, IndexError)
@pytest.mark.asyncio
async def test_run_dict_cancel_triggers_cancel_signal(
monkeypatch: pytest.MonkeyPatch,
) -> None:
action = make_option_map_action(never_prompt=True, default_selection="0")
async def fake_resolve_effective_default() -> str:
return "3"
monkeypatch.setattr(
action, "_resolve_effective_default", fake_resolve_effective_default
)
with pytest.raises(CancelSignal):
await action()
@pytest.mark.asyncio
async def test_run_unsupported_selection_storage_triggers_error_lifecycle(
monkeypatch: pytest.MonkeyPatch,
) -> None:
action = make_action(["alpha"], never_prompt=False)
action._selections = SizedButUnsupportedSelections() # type: ignore[assignment]
calls = register_lifecycle_hooks(action)
async def fake_resolve_effective_default() -> str:
return ""
monkeypatch.setattr(
action, "_resolve_effective_default", fake_resolve_effective_default
)
with pytest.raises(TypeError, match="selections"):
await action()
assert HookType.ON_ERROR in hook_types(calls)
error_contexts = [
context for hook_type, context in calls if hook_type is HookType.ON_ERROR
]
assert isinstance(error_contexts[0].exception, TypeError)
@pytest.mark.asyncio
async def test_run_success_triggers_success_after_and_teardown_hooks() -> None:
action = make_action(["alpha", "beta"], never_prompt=True, default_selection="0")
calls = register_lifecycle_hooks(action)
result = await action()
assert result == "alpha"
assert hook_types(calls).count(HookType.BEFORE) == 1
assert hook_types(calls).count(HookType.ON_SUCCESS) == 1
assert hook_types(calls).count(HookType.AFTER) == 1
assert hook_types(calls).count(HookType.ON_TEARDOWN) == 1
success_contexts = [
context for hook_type, context in calls if hook_type is HookType.ON_SUCCESS
]
assert success_contexts[0].result == "alpha"
@pytest.mark.asyncio
async def test_preview_prints_tree_when_no_parent() -> None:
action = make_option_map_action(default_selection="1", never_prompt=True)
console = CaptureConsole()
action.console = console # type: ignore[assignment]
await action.preview()
assert len(console.printed) == 1
assert "SelectionAction" in str(console.printed[0][0][0].label)
@pytest.mark.asyncio
async def test_preview_adds_to_parent_when_parent_is_provided() -> None:
action = make_action(["alpha", "beta"], default_selection="0")
parent = Tree("Root")
console = CaptureConsole()
action.console = console # type: ignore[assignment]
await action.preview(parent=parent)
assert console.printed == []
assert len(parent.children) == 1
assert "SelectionAction" in str(parent.children[0].label)
def test_str_includes_action_configuration() -> None:
action = make_action(["alpha", "beta"], return_type=SelectionReturnType.KEY)
text = str(action)
assert "SelectionAction" in text
assert "ChooseThing" in text
assert "KEY" in text or "key" in text
def test_clone_copies_selection_action_configuration() -> None:
session = DummyPromptSession()
action = SelectionAction(
name="CloneMe",
selections={"A": SelectionOption("Alpha", "alpha", style="green")},
title="Letters",
columns=3,
prompt_message="Choose letter > ",
default_selection="A",
number_selections="*",
separator=";",
allow_duplicates=True,
inject_last_result=True,
inject_into="choice",
return_type=SelectionReturnType.DESCRIPTION,
prompt_session=session,
never_prompt=True,
show_table=False,
)
clone = action.clone()
assert clone is not action
assert clone.name == action.name
assert clone.title == action.title
assert clone.columns == action.columns
assert clone.prompt_message == action.prompt_message
assert clone.default_selection == action.default_selection
assert clone.number_selections == action.number_selections
assert clone.separator == action.separator
assert clone.allow_duplicates == action.allow_duplicates
assert clone.inject_last_result is True
assert clone.inject_into == "choice"
assert clone.return_type is SelectionReturnType.DESCRIPTION
assert clone.prompt_session is session
assert clone.local_never_prompt is True
assert clone.show_table is False
assert clone.selections is not action.selections
assert clone.selections["A"].description == "Alpha"

View File

@@ -0,0 +1,598 @@
from __future__ import annotations
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Any
import pytest
import toml
import yaml
from rich.tree import Tree
import falyx.action.select_file_action as select_file_module
from falyx.action.action_types import FileType
from falyx.action.select_file_action import SelectFileAction
from falyx.hook_manager import HookType
from falyx.selection import SelectionOption
from falyx.signals import CancelSignal
class DummyPromptSession:
pass
class CaptureConsole:
def __init__(self) -> None:
self.printed: list[tuple[tuple[Any, ...], dict[str, Any]]] = []
def print(self, *args: Any, **kwargs: Any) -> None:
self.printed.append((args, kwargs))
def make_action(directory: Path, **overrides: Any) -> SelectFileAction:
defaults: dict[str, Any] = {
"name": "ChooseFile",
"directory": directory,
"prompt_session": DummyPromptSession(),
}
defaults.update(overrides)
return SelectFileAction(**defaults)
def write_sample_files(directory: Path) -> dict[str, Path]:
paths = {
"text": directory / "note.txt",
"json": directory / "config.json",
"yaml": directory / "config.yaml",
"toml": directory / "config.toml",
"csv": directory / "rows.csv",
"tsv": directory / "rows.tsv",
"xml": directory / "doc.xml",
}
paths["text"].write_text("hello\n", encoding="UTF-8")
paths["json"].write_text('{"name": "falyx", "count": 2}', encoding="UTF-8")
paths["yaml"].write_text("name: falyx\nenabled: true\n", encoding="UTF-8")
paths["toml"].write_text('name = "falyx"\ncount = 2\n', encoding="UTF-8")
paths["csv"].write_text("name,count\nfalyx,2\n", encoding="UTF-8")
paths["tsv"].write_text("name\tcount\nfalyx\t2\n", encoding="UTF-8")
paths["xml"].write_text("<root><name>falyx</name></root>", encoding="UTF-8")
return paths
def register_lifecycle_hooks(action: SelectFileAction) -> list[tuple[HookType, Any]]:
calls: list[tuple[HookType, Any]] = []
def make_hook(hook_type: HookType):
def hook(context):
calls.append((hook_type, context))
return hook
for hook_type in HookType:
action.hooks.register(hook_type, make_hook(hook_type))
return calls
def hook_types(calls: list[tuple[HookType, Any]]) -> list[HookType]:
return [hook_type for hook_type, _ in calls]
def test_init_normalizes_configuration_and_string_return_type(tmp_path: Path) -> None:
session = DummyPromptSession()
action = SelectFileAction(
"ChooseConfig",
tmp_path,
title="Configs",
columns=4,
prompt_message="[bold]Pick >[/] ",
style="green",
suffix_filter=".json",
return_type="json",
encoding="utf-8",
number_selections="*",
separator=";",
allow_duplicates=True,
prompt_session=session,
never_prompt=True,
)
assert action.name == "ChooseConfig"
assert action.directory == tmp_path.resolve()
assert action.title == "Configs"
assert action.columns == 4
assert action.suffix_filter == ".json"
assert action.return_type == FileType.JSON
assert action.encoding == "utf-8"
assert action.number_selections == "*"
assert action.separator == ";"
assert action.allow_duplicates is True
assert action.prompt_session is session
assert action.local_never_prompt is True
assert "ChooseConfig" in str(action)
assert ".json" in str(action)
@pytest.mark.parametrize("number_selections", [1, 2, "*"])
def test_number_selections_accepts_positive_ints_and_star(
tmp_path: Path,
number_selections: int | str,
) -> None:
action = make_action(tmp_path, number_selections=number_selections)
assert action.number_selections == number_selections
@pytest.mark.parametrize("number_selections", [0, -1, "many", object()])
def test_number_selections_rejects_invalid_values(
tmp_path: Path,
number_selections: Any,
) -> None:
action = make_action(tmp_path)
with pytest.raises(ValueError, match="number_selections"):
action.number_selections = number_selections
def test_get_options_uses_numeric_keys_and_selection_options(tmp_path: Path) -> None:
first = tmp_path / "a.txt"
second = tmp_path / "b.txt"
first.write_text("a", encoding="UTF-8")
second.write_text("b", encoding="UTF-8")
action = make_action(tmp_path, style="cyan")
options = action.get_options([first, second])
assert list(options) == ["0", "1"]
assert options["0"] == SelectionOption(
description="a.txt",
value=first,
style="cyan",
)
assert options["1"].description == "b.txt"
assert options["1"].value == second
def test_find_cancel_key_returns_first_numeric_gap_or_next_index(tmp_path: Path) -> None:
action = make_action(tmp_path)
assert action._find_cancel_key({"0": object(), "2": object()}) == "1"
assert action._find_cancel_key({"0": object(), "1": object()}) == "2"
assert action._find_cancel_key({}) == "0"
def test_get_infer_target_disables_signature_inference(tmp_path: Path) -> None:
action = make_action(tmp_path)
assert action.get_infer_target() == (None, None)
@pytest.mark.parametrize(
("return_type", "file_key", "expected"),
[
(FileType.TEXT, "text", "hello\n"),
(FileType.PATH, "text", "PATH"),
(FileType.JSON, "json", {"name": "falyx", "count": 2}),
(FileType.YAML, "yaml", {"name": "falyx", "enabled": True}),
(FileType.TOML, "toml", {"name": "falyx", "count": 2}),
(FileType.CSV, "csv", [["name", "count"], ["falyx", "2"]]),
(FileType.TSV, "tsv", [["name", "count"], ["falyx", "2"]]),
],
)
def test_parse_file_returns_requested_representation(
tmp_path: Path,
return_type: FileType,
file_key: str,
expected: Any,
) -> None:
files = write_sample_files(tmp_path)
action = make_action(tmp_path, return_type=return_type)
result = action.parse_file(files[file_key])
if expected == "PATH":
assert result == files[file_key]
else:
assert result == expected
def test_parse_file_returns_xml_root(tmp_path: Path) -> None:
files = write_sample_files(tmp_path)
action = make_action(tmp_path, return_type=FileType.XML)
result = action.parse_file(files["xml"])
assert isinstance(result, ET.Element)
assert result.tag == "root"
assert result.findtext("name") == "falyx"
def test_clone_preserves_configuration_but_returns_distinct_action(
tmp_path: Path,
) -> None:
session = DummyPromptSession()
action = make_action(
tmp_path,
title="Pick a data file",
columns=2,
prompt_message="Select > ",
style="magenta",
suffix_filter=".json",
return_type=FileType.JSON,
encoding="utf-8",
number_selections=2,
separator="|",
allow_duplicates=True,
prompt_session=session,
never_prompt=True,
)
clone = action.clone()
assert clone is not action
assert clone.name == action.name
assert clone.directory == action.directory
assert clone.title == action.title
assert clone.columns == action.columns
assert clone.prompt_message == action.prompt_message
assert clone.style == action.style
assert clone.suffix_filter == action.suffix_filter
assert clone.return_type == action.return_type
assert clone.encoding == action.encoding
assert clone.number_selections == action.number_selections
assert clone.separator == action.separator
assert clone.allow_duplicates == action.allow_duplicates
assert clone.prompt_session is session
assert clone.local_never_prompt is True
@pytest.mark.asyncio
async def test_preview_prints_tree_when_no_parent_is_given(tmp_path: Path) -> None:
write_sample_files(tmp_path)
action = make_action(tmp_path, suffix_filter=".json", return_type=FileType.JSON)
action.console = CaptureConsole()
await action.preview()
assert len(action.console.printed) == 1
printed_tree = action.console.printed[0][0][0]
assert isinstance(printed_tree, Tree)
@pytest.mark.asyncio
async def test_preview_adds_to_existing_parent_and_limits_file_sample(
tmp_path: Path,
) -> None:
for index in range(12):
(tmp_path / f"config-{index}.json").write_text("{}", encoding="UTF-8")
(tmp_path / "ignore.txt").write_text("ignored", encoding="UTF-8")
action = make_action(tmp_path, suffix_filter=".json", return_type=FileType.JSON)
parent = Tree("root")
await action.preview(parent=parent)
assert len(parent.children) == 1
action_tree = parent.children[0]
rendered_labels = [str(child.label) for child in action_tree.children]
assert any("Suffix filter" in label and ".json" in label for label in rendered_labels)
file_list = next(
child for child in action_tree.children if str(child.label) == "[dim]Files:[/]"
)
assert len(file_list.children) == 11
assert "... (2 more)" in str(file_list.children[-1].label)
@pytest.mark.asyncio
async def test_preview_reports_directory_scan_errors(tmp_path: Path) -> None:
missing_dir = tmp_path / "missing"
action = make_action(missing_dir)
parent = Tree("root")
await action.preview(parent=parent)
action_tree = parent.children[0]
assert any(
"Error scanning directory" in str(child.label) for child in action_tree.children
)
@pytest.mark.asyncio
async def test_run_raises_for_missing_directory_and_triggers_error_lifecycle(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
action = make_action(tmp_path / "missing")
calls = register_lifecycle_hooks(action)
recorded: list[Any] = []
monkeypatch.setattr(select_file_module.er, "record", recorded.append)
with pytest.raises(FileNotFoundError, match="does not exist"):
await action("arg", flag=True)
assert hook_types(calls) == [
HookType.BEFORE,
HookType.ON_ERROR,
HookType.AFTER,
HookType.ON_TEARDOWN,
]
assert recorded
assert isinstance(recorded[0].exception, FileNotFoundError)
assert recorded[0].args == ("arg",)
assert recorded[0].kwargs == {"flag": True}
@pytest.mark.asyncio
async def test_run_raises_when_directory_path_is_file(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
directory_path = tmp_path / "not-a-dir.txt"
directory_path.write_text("not a directory", encoding="UTF-8")
action = make_action(directory_path)
calls = register_lifecycle_hooks(action)
monkeypatch.setattr(select_file_module.er, "record", lambda context: None)
with pytest.raises(NotADirectoryError, match="is not a directory"):
await action()
assert HookType.ON_ERROR in hook_types(calls)
@pytest.mark.asyncio
async def test_run_raises_when_suffix_filter_matches_no_files(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
(tmp_path / "note.txt").write_text("hello", encoding="UTF-8")
action = make_action(tmp_path, suffix_filter=".json")
calls = register_lifecycle_hooks(action)
monkeypatch.setattr(select_file_module.er, "record", lambda context: None)
with pytest.raises(FileNotFoundError, match="No files found"):
await action()
assert HookType.ON_ERROR in hook_types(calls)
@pytest.mark.asyncio
async def test_run_single_selection_returns_parsed_file_and_passes_prompt_options(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
selected = tmp_path / "note.txt"
selected.write_text("selected", encoding="UTF-8")
(tmp_path / "other.json").write_text("{}", encoding="UTF-8")
action = make_action(
tmp_path,
suffix_filter=".txt",
return_type=FileType.TEXT,
number_selections=1,
separator=";",
allow_duplicates=True,
)
calls = register_lifecycle_hooks(action)
recorded: list[Any] = []
prompt_calls: list[dict[str, Any]] = []
render_calls: list[dict[str, Any]] = []
monkeypatch.setattr(select_file_module.er, "record", recorded.append)
def fake_render_selection_dict_table(**kwargs: Any) -> object:
render_calls.append(kwargs)
return object()
async def fake_prompt_for_selection(valid_keys, table, **kwargs: Any) -> str:
prompt_calls.append({"valid_keys": list(valid_keys), "table": table, **kwargs})
return "0"
monkeypatch.setattr(
select_file_module,
"render_selection_dict_table",
fake_render_selection_dict_table,
)
monkeypatch.setattr(
select_file_module,
"prompt_for_selection",
fake_prompt_for_selection,
)
result = await action()
assert result == "selected"
assert hook_types(calls) == [
HookType.BEFORE,
HookType.ON_SUCCESS,
HookType.AFTER,
HookType.ON_TEARDOWN,
]
assert recorded[0].result == "selected"
assert render_calls[0]["title"] == action.title
assert render_calls[0]["columns"] == action.columns
assert set(render_calls[0]["selections"]) == {"0", "1"}
assert prompt_calls[0]["valid_keys"] == ["0", "1"]
assert prompt_calls[0]["prompt_session"] is action.prompt_session
assert prompt_calls[0]["prompt_message"] == action.prompt_message
assert prompt_calls[0]["number_selections"] == 1
assert prompt_calls[0]["separator"] == ";"
assert prompt_calls[0]["allow_duplicates"] is True
assert prompt_calls[0]["cancel_key"] == "1"
@pytest.mark.asyncio
async def test_run_multi_selection_returns_results_for_each_selected_file(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
first = tmp_path / "a.txt"
second = tmp_path / "b.txt"
first.write_text("a", encoding="UTF-8")
second.write_text("b", encoding="UTF-8")
action = make_action(tmp_path, return_type=FileType.PATH, number_selections=2)
monkeypatch.setattr(select_file_module.er, "record", lambda context: None)
monkeypatch.setattr(
select_file_module,
"render_selection_dict_table",
lambda **kwargs: object(),
)
async def fake_prompt_for_selection(*args: Any, **kwargs: Any) -> list[str]:
return ["0", "1"]
monkeypatch.setattr(
select_file_module,
"prompt_for_selection",
fake_prompt_for_selection,
)
result = await action()
print(result)
assert result == [first, second] or result == [second, first]
@pytest.mark.asyncio
async def test_run_cancel_selection_raises_cancel_signal_and_skips_error_hook(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
(tmp_path / "a.txt").write_text("a", encoding="UTF-8")
action = make_action(tmp_path)
calls = register_lifecycle_hooks(action)
recorded: list[Any] = []
monkeypatch.setattr(select_file_module.er, "record", recorded.append)
monkeypatch.setattr(
select_file_module,
"render_selection_dict_table",
lambda **kwargs: object(),
)
async def fake_prompt_for_selection(*args: Any, **kwargs: Any) -> str:
return kwargs["cancel_key"]
monkeypatch.setattr(
select_file_module,
"prompt_for_selection",
fake_prompt_for_selection,
)
with pytest.raises(CancelSignal, match="User canceled"):
await action()
assert hook_types(calls) == [
HookType.BEFORE,
HookType.AFTER,
HookType.ON_TEARDOWN,
]
assert recorded
assert recorded[0].exception is None
def assert_parse_file_value_error(
action: SelectFileAction,
file: Path,
*,
expected_cause_type: (
type[BaseException] | tuple[type[BaseException], ...] | None
) = None,
) -> ValueError:
with pytest.raises(ValueError) as exc_info:
action.parse_file(file)
error = exc_info.value
assert f"Failed to parse {file.name} as" in str(error)
assert error.__cause__ is not None
if expected_cause_type is not None:
assert isinstance(error.__cause__, expected_cause_type)
return error
def test_parse_file_wraps_invalid_json_errors(tmp_path: Path) -> None:
import json
broken = tmp_path / "broken.json"
broken.write_text('{"name": ', encoding="UTF-8")
action = make_action(tmp_path, return_type=FileType.JSON)
assert_parse_file_value_error(
action, broken, expected_cause_type=json.JSONDecodeError
)
def test_parse_file_wraps_invalid_toml_errors(tmp_path: Path) -> None:
broken = tmp_path / "broken.toml"
broken.write_text('name = "falyx"\ncount = ', encoding="UTF-8")
action = make_action(tmp_path, return_type=FileType.TOML)
assert_parse_file_value_error(
action, broken, expected_cause_type=toml.TomlDecodeError
)
def test_parse_file_wraps_invalid_yaml_errors(tmp_path: Path) -> None:
broken = tmp_path / "broken.yaml"
broken.write_text("name: [unterminated\n", encoding="UTF-8")
action = make_action(tmp_path, return_type=FileType.YAML)
assert_parse_file_value_error(action, broken, expected_cause_type=yaml.YAMLError)
def test_parse_file_wraps_invalid_xml_errors(tmp_path: Path) -> None:
broken = tmp_path / "broken.xml"
broken.write_text("<root><name>falyx</root>", encoding="UTF-8")
action = make_action(tmp_path, return_type=FileType.XML)
assert_parse_file_value_error(action, broken, expected_cause_type=ET.ParseError)
@pytest.mark.parametrize(
"return_type",
[
FileType.TEXT,
FileType.JSON,
FileType.YAML,
FileType.TOML,
FileType.CSV,
FileType.TSV,
FileType.XML,
],
)
def test_parse_file_wraps_missing_file_errors(
tmp_path: Path, return_type: FileType
) -> None:
missing = tmp_path / "missing.data"
action = make_action(tmp_path, return_type=return_type)
assert_parse_file_value_error(action, missing, expected_cause_type=FileNotFoundError)
@pytest.mark.parametrize("return_type", [FileType.CSV, FileType.TSV])
def test_parse_file_wraps_csv_style_open_errors(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
return_type: FileType,
) -> None:
data_file = tmp_path / "rows.data"
data_file.write_text("name,count\nfalyx,2\n", encoding="UTF-8")
action = make_action(tmp_path, return_type=return_type)
def fake_open(*args: Any, **kwargs: Any) -> Any:
raise OSError("cannot open test file")
monkeypatch.setattr("builtins.open", fake_open)
error = assert_parse_file_value_error(action, data_file, expected_cause_type=OSError)
assert "cannot open test file" in str(error)
def test_parse_file_wraps_unsupported_return_type_errors(tmp_path: Path) -> None:
data_file = tmp_path / "note.txt"
data_file.write_text("hello", encoding="UTF-8")
action = make_action(tmp_path, return_type=FileType.TEXT)
action.return_type = object() # Force the defensive unsupported-type branch.
error = assert_parse_file_value_error(
action, data_file, expected_cause_type=ValueError
)
assert "Unsupported return type" in str(error.__cause__)