Add Examples

This commit is contained in:
2025-04-16 21:52:40 -04:00
parent b9859c700f
commit 4969b7cfa8
10 changed files with 155 additions and 48 deletions

View File

@ -0,0 +1,29 @@
import asyncio
from falyx import Action, ActionGroup, ChainedAction
# Actions can be defined as synchronous functions
# Falyx will automatically convert them to async functions
def hello() -> None:
print("Hello, world!")
hello = Action(name="hello_action", action=hello)
# Actions can be run by themselves or as part of a command or pipeline
asyncio.run(hello())
# Actions are designed to be asynchronous first
async def goodbye() -> None:
print("Goodbye!")
goodbye = Action(name="goodbye_action", action=goodbye)
asyncio.run(goodbye())
# Actions can be run in parallel
group = ActionGroup(name="greeting_group", actions=[hello, goodbye])
asyncio.run(group())
# Actions can be run in a chain
chain = ChainedAction(name="greeting_chain", actions=[hello, goodbye])
asyncio.run(chain())

26
examples/process_pool.py Normal file
View File

@ -0,0 +1,26 @@
from falyx import Falyx, ProcessAction
from falyx.themes.colors import NordColors as nc
from rich.console import Console
console = Console()
falyx = Falyx(title="🚀 Process Pool Demo")
def generate_primes(n):
primes = []
for num in range(2, n):
if all(num % p != 0 for p in primes):
primes.append(num)
console.print(f"Generated {len(primes)} primes up to {n}.", style=nc.GREEN)
return primes
# Will not block the event loop
heavy_action = ProcessAction("Prime Generator", generate_primes, args=(100_000,))
falyx.add_command("R", "Generate Primes", heavy_action, spinner=True)
if __name__ == "__main__":
import asyncio
# Run the menu
asyncio.run(falyx.run())

36
examples/simple.py Normal file
View File

@ -0,0 +1,36 @@
import asyncio
import random
from falyx import Falyx, Action, ChainedAction
from falyx.utils import setup_logging
setup_logging()
# A flaky async step that fails randomly
async def flaky_step():
await asyncio.sleep(0.2)
if random.random() < 0.5:
raise RuntimeError("Random failure!")
return "ok"
# Create a retry handler
step1 = Action(name="step_1", action=flaky_step, retry=True)
step2 = Action(name="step_2", action=flaky_step, retry=True)
# Chain the actions
chain = ChainedAction(name="my_pipeline", actions=[step1, step2])
# Create the CLI menu
falyx = Falyx("🚀 Falyx Demo")
falyx.add_command(
key="R",
description="Run My Pipeline",
action=chain,
logging_hooks=True,
preview_before_confirm=True,
confirm=True,
)
# Entry point
if __name__ == "__main__":
asyncio.run(falyx.run())