Add examples of prompt_toolkit

This commit is contained in:
Roland Thomas Jr 2024-05-26 00:36:58 -04:00
parent 11f98d078f
commit f6ee4292ea
Signed by: roland
GPG Key ID: 7C3C2B085A4C2872
11 changed files with 715 additions and 5 deletions

26
api/slow_api.py Normal file
View File

@ -0,0 +1,26 @@
""" Module that mocks a slow API. """
from flask import Flask, request
from time import sleep
from random import randint
app = Flask(__name__)
@app.route('/slow')
def slow():
sleep(randint(1, 5))
return 'Slow response'
@app.route('/fast', methods=['GET', 'POST'])
def fast():
""" If GET request is made to /fast, the server will return a fast response.
If POST request is made to /fast, it will save the data to a file and return a fast response."""
if request.method == 'POST':
data = request.data.decode('utf-8')
with open('data.txt', 'a') as file:
file.write(f"{data}\n")
return 'Data saved!'
return 'Fast response'
if __name__ == '__main__':
app.run(port=5597)

65
cli/builder.py Executable file
View File

@ -0,0 +1,65 @@
from __future__ import print_function, unicode_literals
from PyInquirer import prompt, print_json
def ask_questions():
questions = [
{
'type': 'input',
'name': 'name',
'message': 'What\'s your name',
},
{
'type': 'input',
'name': 'age',
'message': 'How old are you',
},
{
'type': 'input',
'name': 'city',
'message': 'Where do you live',
},
]
answers = prompt(questions)
print_json(answers)
print('Hello {name}, you are {age} years old and live in {city}'.format(**answers))
print(f"Hello {answers['name']}, you are {answers['age']} years old and live in {answers['city']}")
def choose_option():
questions = [
{
'type': 'list',
'name': 'theme',
'message': 'What do you want to do',
'choices': [
'Order a pizza',
'Make a reservation',
'Ask for opening hours',
'Contact support',
'Talk to the receptionist',
]
}
]
answers = prompt(questions)
print_json(answers)
def editor_args():
questions = [
{
'type': 'editor',
'name': 'bio',
'message': 'Please write a short bio of at least 3 lines',
'validate': lambda text: len(text.split('\n')) >= 3 or 'Must be at least 3 lines.'
}
]
answers = prompt(questions)
print_json(answers)
if __name__ == '__main__':
#ask_questions()
#choose_option()
editor_args()

27
cli/foo.py Executable file
View File

@ -0,0 +1,27 @@
from prompt_toolkit import PromptSession
from prompt_toolkit.completion import Completer, Completion
class Completer(Completer):
def get_completions(self, document, complete_event):
yield Completion('foo', start_position=0, style='bg:ansiyellow fg:ansiblack')
yield Completion('bar', start_position=0, style='underline')
class Foo:
def __init__(self):
self.session = PromptSession()
def run(self):
print(self.multi_line_prompt())
def multi_line_prompt(self):
""" Prompt the user for input, allowing multiple lines with default text. """
return self.session.prompt('Give me some input: ', multiline=True, default='Hello\nWorld\n', completer=Completer())
def main():
foo = Foo()
foo.run()
if __name__ == '__main__':
main()

97
cli/in_put.py Executable file
View File

@ -0,0 +1,97 @@
import time
from prompt_toolkit.formatted_text import HTML, merge_formatted_text
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.key_binding.key_processor import KeyPressEvent as E
from prompt_toolkit.keys import Keys
from prompt_toolkit.shortcuts import PromptSession
from prompt_toolkit.shortcuts.progress_bar import formatters
from prompt_toolkit.styles import Style
style = Style.from_dict(
{
"title": "#D08770 underline",
"label": "#D8DEE9 bold",
"percentage": "#D08770",
"bar-a": "bg:#D08770 #D08770",
"bar-b": "bg:#D08770 #2E3440",
"bar-c": "#D8DEE9",
"current": "#D8DEE9",
"total": "#D08770",
"time-elapsed": "#D8DEE9",
"time-left": "#D08770",
}
)
custom_formatters = [
formatters.Label(suffix=": "),
formatters.Bar(start="|", end="|", sym_a="\u2588", sym_b="\u2588", sym_c="\u2591"),
formatters.Text(" "),
formatters.Progress(),
formatters.Text(" "),
formatters.Percentage(),
formatters.Text(" [elapsed: "),
formatters.TimeElapsed(),
formatters.Text(" left: "),
formatters.TimeLeft(),
formatters.Text("]"),
]
def get_toolbar():
return f"time: {time.ctime():<30}"
def create_confirm_session(
message: str, suffix: str = " (y/n) "
) -> PromptSession[bool]:
"""
Create a `PromptSession` object for the 'confirm' function.
"""
bindings = KeyBindings()
@bindings.add("y")
@bindings.add("Y")
def yes(event: E) -> None:
session.default_buffer.text = "y"
event.app.exit(result=True)
@bindings.add("n")
@bindings.add("N")
def no(event: E) -> None:
session.default_buffer.text = "n"
event.app.exit(result=False)
@bindings.add("enter")
def enter(event: E) -> None:
"Accept the current value."
session.default_buffer.text = "y"
event.app.exit(result=True)
@bindings.add(Keys.Any)
def _(event: E) -> None:
"Disallow inserting other text."
pass
complete_message = merge_formatted_text([message, suffix])
session: PromptSession[bool] = PromptSession(
complete_message, key_bindings=bindings
)
return session
def confirm(message: str = "Confirm?", suffix: str = " (y/n) ") -> bool:
"""
Display a confirmation prompt that returns True/False.
"""
session = create_confirm_session(message, suffix)
return session.prompt()
def confirm_async(message: str = "Confirm?", suffix: str = " (y/n) ") -> bool:
"""
Display a confirmation prompt that returns True/False.
"""
session = create_confirm_session(message, suffix)
return session.prompt_async()

70
cli/jql_lexer.py Executable file
View File

@ -0,0 +1,70 @@
from prompt_toolkit.lexers import Lexer
from prompt_toolkit.styles import Style
from prompt_toolkit.document import Document
from prompt_toolkit.formatted_text import StyleAndTextTuples
from typing import Callable
class JQLLexer(Lexer):
def lex_document(self, document: Document) -> Callable[[int], StyleAndTextTuples]:
text = document.text
tokens = []
keywords = {
"AND", "OR", "NOT", "IN", "ORDER BY", "ASC", "DESC",
"IS", "NULL", "TRUE", "FALSE", "EMPTY"
}
operators = {
"=", "!", ">", "<", ">=", "<=", "~", "!~", "!="
}
punctuations = {"(", ")", ",", ":", " "}
pos = 0
word = ''
while pos < len(text):
char = text[pos]
if char.isalpha():
word += char
else:
if word:
if word.upper() in keywords:
tokens.append(('class:keyword', word))
else:
tokens.append(('class:name', word))
word = ''
if char in operators:
tokens.append(('class:operator', char))
elif char in punctuations:
tokens.append(('class:punctuation', char))
elif char.isspace():
tokens.append(('class:text', char))
else:
tokens.append(('class:error', char))
pos += 1
if word:
if word.upper() in keywords:
tokens.append(('class:keyword', word))
else:
tokens.append(('class:name', word))
return lambda i: tokens
# Example usage
from prompt_toolkit import PromptSession
custom_style = Style.from_dict({
'keyword': '#ff0066 bold',
'operator': '#00ff00',
'name': '#0000ff',
'punctuation': '#00ffff',
'text': '#ffffff',
'error': '#ff0000 bold',
})
session = PromptSession(lexer=JQLLexer(), style=custom_style)
text = session.prompt('Enter JQL: ')
print(f'You entered: {text}')

129
cli/progress_bar.py Executable file
View File

@ -0,0 +1,129 @@
#!/usr/bin/env python
""" progress_bar.py
This module demonstrates how to use the prompt_toolkit ProgressBar to display
the progress of a list of tasks. The progress bar is only updated when tasks
are done not when started.
Tasks use ThreadPoolExecutor for concurrency.
"""
import time
import os
import signal
from concurrent.futures import as_completed, ThreadPoolExecutor
from random import randint
from prompt_toolkit.shortcuts import ProgressBar
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.patch_stdout import patch_stdout
import requests
from threading import Lock
from in_put import confirm, style, custom_formatters, get_toolbar
class Forge:
""" Forge class
This class demonstrates how to use the prompt_toolkit ProgressBar to display
the progress of a list of tasks. The progress bar is only updated when tasks
are done not when started.
"""
kb = KeyBindings()
cancel = False
def __init__(self):
self.lock = Lock()
self.tasks_done = 0
self.fast_tasks_done = 0
self.slow_tasks_done = 0
self.fast_task_count = randint(50, 400)
self.slow_task_count = randint(50, 100)
self.total_tasks = self.fast_task_count + self.slow_task_count
@kb.add("q")
def _(event):
cancel = True
@kb.add("f")
def _(event):
print("f")
@kb.add("x")
def _(event):
os.kill(os.getpid(), signal.SIGINT)
def save_data(self, future):
data = future.result()
with open("data.txt", "a") as file:
file.write(f"{data}\n")
time.sleep(randint(1, 3))
with self.lock:
self.slow_tasks_done += 1
def send_data(self, future):
data = future.result()
result = requests.post("http://localhost:5597/fast", data=data)
if result.text == "Data saved!":
with self.lock:
self.fast_tasks_done += 1
def fast_task(self):
result = requests.get("http://localhost:5597/fast")
return result.text
def slow_task(self):
result = requests.get("http://localhost:5597/slow")
return result.text
def run(self):
""" Keeps track of the progress of a list of tasks using a seperate progress bar
for each task. The progress bar is only update when tasks are done not when started.
"""
executor = ThreadPoolExecutor()
fast_futures = []
slow_futures = []
# Print number of fast, slow, and total tasks
print(f"Fast tasks: {self.fast_task_count}")
print(f"Slow tasks: {self.slow_task_count}")
print(f"Total tasks: {self.total_tasks}")
with patch_stdout():
with ProgressBar(
title="Forge",
formatters=custom_formatters,
style=style,
bottom_toolbar=get_toolbar,
key_bindings=self.kb,
) as pb:
task_progress = pb(range(self.total_tasks), label="Tasks")
slow_progress = pb(range(self.slow_task_count), label="Slow tasks")
for _ in range(self.slow_task_count):
fast_futures.append(executor.submit(self.slow_task))
fast_futures[-1].add_done_callback(self.save_data)
fast_progress = pb(range(self.fast_task_count), label="Fast tasks")
for _ in range(self.fast_task_count):
slow_futures.append(executor.submit(self.fast_task))
slow_futures[-1].add_done_callback(self.send_data)
while not (fast_progress.done and slow_progress.done):
time.sleep(0.1)
with self.lock:
slow_progress.items_completed = self.slow_tasks_done
fast_progress.items_completed = self.fast_tasks_done
task_progress.items_completed = self.slow_tasks_done + self.fast_tasks_done
if self.fast_tasks_done == self.fast_task_count:
fast_progress.done = True
if self.slow_tasks_done == self.slow_task_count:
slow_progress.done = True
executor.shutdown()
result = confirm("Do you want to print the data?")
if result:
with open("data.txt", "r") as file:
print(file.read())
def main():
forge = Forge()
forge.run()
if __name__ == "__main__":
main()

126
cli/progress_bar_async.py Executable file
View File

@ -0,0 +1,126 @@
#!/usr/bin/env python
""" progress_bar.py
This module demonstrates how to use the prompt_toolkit ProgressBar to display
the progress of a list of tasks. The progress bar is only updated when tasks
are done not when started.
Tasks use asyncio for concurrency.
"""
import asyncio
import os
import signal
from random import randint
from threading import Lock
import aiohttp
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.patch_stdout import patch_stdout
from prompt_toolkit.shortcuts import ProgressBar
from in_put import confirm_async as confirm
from in_put import custom_formatters, get_toolbar, style
class Forge:
""" Forge class
This class demonstrates how to use the prompt_toolkit ProgressBar to display
the progress of a list of tasks. The progress bar is only updated when tasks
are done not when started.
"""
kb = KeyBindings()
cancel = False
def __init__(self):
self.lock = Lock()
self.tasks_done = 0
self.fast_tasks_done = 0
self.slow_tasks_done = 0
self.fast_task_count = randint(50, 400)
self.slow_task_count = randint(50, 100)
self.total_tasks = self.fast_task_count + self.slow_task_count
@kb.add("q")
def _(event):
cancel = True
@kb.add("f")
def _(event):
print("f")
@kb.add("x")
def _(event):
os.kill(os.getpid(), signal.SIGINT)
async def save_data(self, data):
with open("data.txt", "a") as file:
file.write(f"{data}\n")
await asyncio.sleep(randint(5, 15))
self.slow_tasks_done += 1
async def send_data(self, data):
async with aiohttp.ClientSession() as session:
async with session.post("http://localhost:5597/fast", data=data) as result:
if await result.text() == "Data saved!":
self.fast_tasks_done += 1
async def fast_task(self):
async with aiohttp.ClientSession() as session:
async with session.get("http://localhost:5597/fast") as result:
return await self.send_data(await result.text())
async def slow_task(self):
async with aiohttp.ClientSession() as session:
async with session.get("http://localhost:5597/slow") as result:
return await self.save_data(await result.text())
async def run(self):
""" Keeps track of the progress of a list of tasks using a seperate progress bar
for each task. The progress bar is only update when tasks are done not when started.
"""
slow_tasks = []
fast_tasks = []
# Print number of fast, slow, and total tasks
print(f"Fast tasks: {self.fast_task_count}")
print(f"Slow tasks: {self.slow_task_count}")
print(f"Total tasks: {self.total_tasks}")
with patch_stdout():
with ProgressBar(
title="Forge",
formatters=custom_formatters,
style=style,
bottom_toolbar=get_toolbar,
key_bindings=self.kb,
) as pb:
task_progress = pb(range(self.total_tasks), label="Tasks")
slow_progress = pb(range(self.slow_task_count), label="Slow tasks")
for _ in range(self.slow_task_count):
slow_tasks.append(asyncio.create_task(self.slow_task()))
fast_progress = pb(range(self.fast_task_count), label="Fast tasks")
for _ in range(self.fast_task_count):
fast_tasks.append(asyncio.create_task(self.fast_task()))
while not (fast_progress.done and slow_progress.done):
await asyncio.sleep(0.1)
slow_progress.items_completed = self.slow_tasks_done
fast_progress.items_completed = self.fast_tasks_done
task_progress.items_completed = self.slow_tasks_done + self.fast_tasks_done
if self.fast_tasks_done == self.fast_task_count:
fast_progress.done = True
if self.slow_tasks_done == self.slow_task_count:
slow_progress.done = True
result = await confirm("Do you want to print the data?")
if result:
with open("data.txt", "r") as file:
print(file.read())
def main():
forge = Forge()
asyncio.run(forge.run())
if __name__ == "__main__":
main()

54
cli/rprompt.py Executable file
View File

@ -0,0 +1,54 @@
#!/usr/bin/env python
"""
Example of a right prompt. This is an additional prompt that is displayed on
the right side of the terminal. It will be hidden automatically when the input
is long enough to cover the right side of the terminal.
This is similar to RPROMPT is Zsh.
"""
import time
from prompt_toolkit import prompt
from prompt_toolkit.formatted_text import ANSI, HTML
from prompt_toolkit.styles import Style
example_style = Style.from_dict(
{
# The 'rprompt' gets by default the 'rprompt' class. We can use this
# for the styling.
"rprompt": "bg:#D08770 #ffffff",
}
)
def get_rprompt_text():
return [
("", " "),
("underline", f"{time.ctime()}"),
("", " "),
]
def main():
# Option 1: pass a string to 'rprompt':
answer = prompt("> ", rprompt=" <rprompt> ", style=example_style)
print(f"You said: {answer}")
# Option 2: pass HTML:
answer = prompt("> ", rprompt=HTML(" <u>&lt;rprompt&gt;</u> "), style=example_style)
print(f"You said: {answer}")
# Option 3: pass ANSI:
answer = prompt(
"> ", rprompt=ANSI(" \x1b[4m<rprompt>\x1b[0m "), style=example_style
)
print(f"You said: {answer}")
# Option 4: Pass a callable. (This callable can either return plain text,
# an HTML object, an ANSI object or a list of (style, text)
# tuples.
answer = prompt("> ", rprompt=get_rprompt_text, style=example_style, refresh_interval=1)
print(f"You said: {answer}")
if __name__ == "__main__":
main()

41
isomorphic_strings.py Normal file
View File

@ -0,0 +1,41 @@
#!/usr/bin/env python3
from collections import Counter
def isIsometric(s, t):
if len(s) != len(t):
return False
s_dict = {}
t_dict = {}
for index in range(len(s)):
if s[index] not in s_dict:
s_dict[s[index]] = t[index]
if t[index] not in t_dict:
t_dict[t[index]] = s[index]
if s_dict[s[index]] != t[index] or t_dict[t[index]] != s[index]:
return False
return True
if __name__ == '__main__':
s = 'egg'
t = 'add'
print(isIsometric(s, t)) # True
s = 'foo'
t = 'bar'
print(isIsometric(s, t)) # False
s = 'paper'
t = 'title'
print(isIsometric(s, t)) # True
s = 'ab'
t = 'aa'
print(isIsometric(s, t)) # False
s = 'bbbaaaba'
t = 'aaabbbba'
print(isIsometric(s, t)) # False

14
meeting.py Normal file
View File

@ -0,0 +1,14 @@
from collections import defaultdict
def meeting(s):
result = [tuple(word.split(":")) for word in s.split(";")]
reorder = sorted([(tup[1].upper(),tup[0].upper()) for tup in result])
names = defaultdict(lambda:[])
for tup in reorder:
names[tup[0]].append(tup)
names_list = []
for _, value in names.items():
names_list.extend(sorted(value, key=lambda tup: tup[0]))
get_string = [f"({tup[0]}, {tup[1]})" for tup in names_list]
return "".join(get_string)
print(meeting("Alexis:Wahl;John:Bell;Victoria:Schwarz;Abba:Dorny;Grace:Meta;Ann:Arno;Madison:STAN;Alex:Cornwell;Lewis:Kern;Megan:Stan;Alex:Korn"))

View File

@ -1,10 +1,32 @@
#!/usr/bin/env python3
import timeit
from functools import partial
from time import sleep
from typing import List
from random import randint
from typing import Dict
from concurrent.futures import ThreadPoolExecutor, Future
results: list[int] = []
from prompt_toolkit.shortcuts import ProgressBar
from pydantic import BaseModel
results: List[int] = []
class Result(BaseModel):
result: int
def __repr__(self) -> str:
return f"Result: {self.result} {id(self)}"
class Requests:
requests: Dict[str, Result] = {}
odd_requests: Dict[str, Result] = {}
even_requests: Dict[str, Result] = {}
def __repr__(self) -> str:
return f"Requests: {self.requests}\nOdd requests: {self.odd_requests}\nEven requests: {self.even_requests}"
def add_one(number: int) -> int:
@ -18,10 +40,11 @@ def aggregate_results(future: Future):
def first_method():
with ThreadPoolExecutor(32) as executor:
futures = [executor.submit(add_one, number) for number in range(10)]
for future in futures:
future.add_done_callback(aggregate_results)
with ProgressBar() as pb:
with ThreadPoolExecutor(32) as executor:
futures = [executor.submit(add_one, number) for number in range(10)]
for future in pb(futures, label="Processing tasks..."):
future.add_done_callback(aggregate_results)
def second_method():
@ -32,6 +55,42 @@ def second_method():
futures[-1].add_done_callback(aggregate_results)
def add_one_result(result: Result) -> Result:
result.result += 1
return result
def add_one_fut(future: Future):
result = future.result()
if result.result % 2 == 0:
Requests.even_requests[str(result.result)] = result
else:
Requests.odd_requests[str(result.result)] = result
def third_method():
for number in range(10):
result = Result(result=randint(0, 100))
Requests.requests[str(number)] = result
if result.result % 2 == 0:
Requests.even_requests[str(result.result)] = result
else:
Requests.odd_requests[str(result.result)] = result
futures = []
with ThreadPoolExecutor(32) as executor:
for _, request in Requests.requests.items():
future = executor.submit(add_one_result, request)
future.add_done_callback(add_one_fut)
futures.append(future)
print("Done waiting!")
print(Requests.requests)
print("odd:", Requests.odd_requests)
print("even:", Requests.even_requests)
print("All done!")
def main():
print(timeit.timeit(first_method, number=1))
print(results)
@ -41,6 +100,8 @@ def main():
print(results)
print("All done!")
third_method()
if __name__ == "__main__":
main()