From f6ee4292ea045b2c424057cb32b9111765fb22cb Mon Sep 17 00:00:00 2001 From: Roland Thomas Date: Sun, 26 May 2024 00:36:58 -0400 Subject: [PATCH] Add examples of prompt_toolkit --- api/slow_api.py | 26 ++++ cli/builder.py | 65 ++++++++++ cli/foo.py | 27 ++++ cli/in_put.py | 97 +++++++++++++++ cli/jql_lexer.py | 70 +++++++++++ cli/progress_bar.py | 129 ++++++++++++++++++++ cli/progress_bar_async.py | 126 +++++++++++++++++++ cli/rprompt.py | 54 ++++++++ isomorphic_strings.py | 41 +++++++ meeting.py | 14 +++ thread_pool_patterns/submit_use_callback.py | 71 ++++++++++- 11 files changed, 715 insertions(+), 5 deletions(-) create mode 100644 api/slow_api.py create mode 100755 cli/builder.py create mode 100755 cli/foo.py create mode 100755 cli/in_put.py create mode 100755 cli/jql_lexer.py create mode 100755 cli/progress_bar.py create mode 100755 cli/progress_bar_async.py create mode 100755 cli/rprompt.py create mode 100644 isomorphic_strings.py create mode 100644 meeting.py diff --git a/api/slow_api.py b/api/slow_api.py new file mode 100644 index 0000000..5a24fab --- /dev/null +++ b/api/slow_api.py @@ -0,0 +1,26 @@ +""" Module that mocks a slow API. """ +from flask import Flask, request +from time import sleep +from random import randint + +app = Flask(__name__) + +@app.route('/slow') +def slow(): + sleep(randint(1, 5)) + return 'Slow response' + +@app.route('/fast', methods=['GET', 'POST']) +def fast(): + """ If GET request is made to /fast, the server will return a fast response. + If POST request is made to /fast, it will save the data to a file and return a fast response.""" + if request.method == 'POST': + data = request.data.decode('utf-8') + with open('data.txt', 'a') as file: + file.write(f"{data}\n") + return 'Data saved!' + return 'Fast response' + +if __name__ == '__main__': + app.run(port=5597) + diff --git a/cli/builder.py b/cli/builder.py new file mode 100755 index 0000000..e603cdf --- /dev/null +++ b/cli/builder.py @@ -0,0 +1,65 @@ +from __future__ import print_function, unicode_literals +from PyInquirer import prompt, print_json + + +def ask_questions(): + questions = [ + { + 'type': 'input', + 'name': 'name', + 'message': 'What\'s your name', + }, + { + 'type': 'input', + 'name': 'age', + 'message': 'How old are you', + }, + { + 'type': 'input', + 'name': 'city', + 'message': 'Where do you live', + }, + ] + + answers = prompt(questions) + print_json(answers) + print('Hello {name}, you are {age} years old and live in {city}'.format(**answers)) + print(f"Hello {answers['name']}, you are {answers['age']} years old and live in {answers['city']}") + +def choose_option(): + questions = [ + { + 'type': 'list', + 'name': 'theme', + 'message': 'What do you want to do', + 'choices': [ + 'Order a pizza', + 'Make a reservation', + 'Ask for opening hours', + 'Contact support', + 'Talk to the receptionist', + ] + } + ] + + answers = prompt(questions) + print_json(answers) + +def editor_args(): + questions = [ + { + 'type': 'editor', + 'name': 'bio', + 'message': 'Please write a short bio of at least 3 lines', + 'validate': lambda text: len(text.split('\n')) >= 3 or 'Must be at least 3 lines.' + } + ] + + answers = prompt(questions) + print_json(answers) + + +if __name__ == '__main__': + #ask_questions() + #choose_option() + editor_args() diff --git a/cli/foo.py b/cli/foo.py new file mode 100755 index 0000000..39af6ac --- /dev/null +++ b/cli/foo.py @@ -0,0 +1,27 @@ +from prompt_toolkit import PromptSession +from prompt_toolkit.completion import Completer, Completion + + +class Completer(Completer): + def get_completions(self, document, complete_event): + yield Completion('foo', start_position=0, style='bg:ansiyellow fg:ansiblack') + yield Completion('bar', start_position=0, style='underline') + +class Foo: + def __init__(self): + self.session = PromptSession() + + def run(self): + print(self.multi_line_prompt()) + + + def multi_line_prompt(self): + """ Prompt the user for input, allowing multiple lines with default text. """ + return self.session.prompt('Give me some input: ', multiline=True, default='Hello\nWorld\n', completer=Completer()) + +def main(): + foo = Foo() + foo.run() + +if __name__ == '__main__': + main() diff --git a/cli/in_put.py b/cli/in_put.py new file mode 100755 index 0000000..4cb7cba --- /dev/null +++ b/cli/in_put.py @@ -0,0 +1,97 @@ +import time + +from prompt_toolkit.formatted_text import HTML, merge_formatted_text +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.key_binding.key_processor import KeyPressEvent as E +from prompt_toolkit.keys import Keys +from prompt_toolkit.shortcuts import PromptSession +from prompt_toolkit.shortcuts.progress_bar import formatters +from prompt_toolkit.styles import Style + +style = Style.from_dict( + { + "title": "#D08770 underline", + "label": "#D8DEE9 bold", + "percentage": "#D08770", + "bar-a": "bg:#D08770 #D08770", + "bar-b": "bg:#D08770 #2E3440", + "bar-c": "#D8DEE9", + "current": "#D8DEE9", + "total": "#D08770", + "time-elapsed": "#D8DEE9", + "time-left": "#D08770", + } +) + + +custom_formatters = [ + formatters.Label(suffix=": "), + formatters.Bar(start="|", end="|", sym_a="\u2588", sym_b="\u2588", sym_c="\u2591"), + formatters.Text(" "), + formatters.Progress(), + formatters.Text(" "), + formatters.Percentage(), + formatters.Text(" [elapsed: "), + formatters.TimeElapsed(), + formatters.Text(" left: "), + formatters.TimeLeft(), + formatters.Text("]"), +] + + +def get_toolbar(): + return f"time: {time.ctime():<30}" + + +def create_confirm_session( + message: str, suffix: str = " (y/n) " +) -> PromptSession[bool]: + """ + Create a `PromptSession` object for the 'confirm' function. + """ + bindings = KeyBindings() + + @bindings.add("y") + @bindings.add("Y") + def yes(event: E) -> None: + session.default_buffer.text = "y" + event.app.exit(result=True) + + @bindings.add("n") + @bindings.add("N") + def no(event: E) -> None: + session.default_buffer.text = "n" + event.app.exit(result=False) + + @bindings.add("enter") + def enter(event: E) -> None: + "Accept the current value." + session.default_buffer.text = "y" + event.app.exit(result=True) + + @bindings.add(Keys.Any) + def _(event: E) -> None: + "Disallow inserting other text." + pass + + complete_message = merge_formatted_text([message, suffix]) + session: PromptSession[bool] = PromptSession( + complete_message, key_bindings=bindings + ) + return session + + +def confirm(message: str = "Confirm?", suffix: str = " (y/n) ") -> bool: + """ + Display a confirmation prompt that returns True/False. + """ + session = create_confirm_session(message, suffix) + return session.prompt() + + +def confirm_async(message: str = "Confirm?", suffix: str = " (y/n) ") -> bool: + """ + Display a confirmation prompt that returns True/False. + """ + session = create_confirm_session(message, suffix) + return session.prompt_async() \ No newline at end of file diff --git a/cli/jql_lexer.py b/cli/jql_lexer.py new file mode 100755 index 0000000..fe6ac1b --- /dev/null +++ b/cli/jql_lexer.py @@ -0,0 +1,70 @@ +from prompt_toolkit.lexers import Lexer +from prompt_toolkit.styles import Style +from prompt_toolkit.document import Document +from prompt_toolkit.formatted_text import StyleAndTextTuples +from typing import Callable + +class JQLLexer(Lexer): + def lex_document(self, document: Document) -> Callable[[int], StyleAndTextTuples]: + text = document.text + tokens = [] + + keywords = { + "AND", "OR", "NOT", "IN", "ORDER BY", "ASC", "DESC", + "IS", "NULL", "TRUE", "FALSE", "EMPTY" + } + operators = { + "=", "!", ">", "<", ">=", "<=", "~", "!~", "!=" + } + punctuations = {"(", ")", ",", ":", " "} + + pos = 0 + word = '' + + while pos < len(text): + char = text[pos] + + if char.isalpha(): + word += char + else: + if word: + if word.upper() in keywords: + tokens.append(('class:keyword', word)) + else: + tokens.append(('class:name', word)) + word = '' + + if char in operators: + tokens.append(('class:operator', char)) + elif char in punctuations: + tokens.append(('class:punctuation', char)) + elif char.isspace(): + tokens.append(('class:text', char)) + else: + tokens.append(('class:error', char)) + pos += 1 + + if word: + if word.upper() in keywords: + tokens.append(('class:keyword', word)) + else: + tokens.append(('class:name', word)) + + return lambda i: tokens + +# Example usage +from prompt_toolkit import PromptSession + +custom_style = Style.from_dict({ + 'keyword': '#ff0066 bold', + 'operator': '#00ff00', + 'name': '#0000ff', + 'punctuation': '#00ffff', + 'text': '#ffffff', + 'error': '#ff0000 bold', +}) + +session = PromptSession(lexer=JQLLexer(), style=custom_style) + +text = session.prompt('Enter JQL: ') +print(f'You entered: {text}') diff --git a/cli/progress_bar.py b/cli/progress_bar.py new file mode 100755 index 0000000..5cbcf9a --- /dev/null +++ b/cli/progress_bar.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python +""" progress_bar.py +This module demonstrates how to use the prompt_toolkit ProgressBar to display +the progress of a list of tasks. The progress bar is only updated when tasks +are done not when started. + +Tasks use ThreadPoolExecutor for concurrency. +""" +import time +import os +import signal +from concurrent.futures import as_completed, ThreadPoolExecutor +from random import randint +from prompt_toolkit.shortcuts import ProgressBar +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.patch_stdout import patch_stdout +import requests +from threading import Lock + +from in_put import confirm, style, custom_formatters, get_toolbar + + +class Forge: + """ Forge class + This class demonstrates how to use the prompt_toolkit ProgressBar to display + the progress of a list of tasks. The progress bar is only updated when tasks + are done not when started. + """ + kb = KeyBindings() + cancel = False + def __init__(self): + self.lock = Lock() + self.tasks_done = 0 + self.fast_tasks_done = 0 + self.slow_tasks_done = 0 + self.fast_task_count = randint(50, 400) + self.slow_task_count = randint(50, 100) + self.total_tasks = self.fast_task_count + self.slow_task_count + + @kb.add("q") + def _(event): + cancel = True + + @kb.add("f") + def _(event): + print("f") + + @kb.add("x") + def _(event): + os.kill(os.getpid(), signal.SIGINT) + + def save_data(self, future): + data = future.result() + with open("data.txt", "a") as file: + file.write(f"{data}\n") + time.sleep(randint(1, 3)) + with self.lock: + self.slow_tasks_done += 1 + + def send_data(self, future): + data = future.result() + result = requests.post("http://localhost:5597/fast", data=data) + if result.text == "Data saved!": + with self.lock: + self.fast_tasks_done += 1 + + def fast_task(self): + result = requests.get("http://localhost:5597/fast") + return result.text + + def slow_task(self): + result = requests.get("http://localhost:5597/slow") + return result.text + + def run(self): + """ Keeps track of the progress of a list of tasks using a seperate progress bar + for each task. The progress bar is only update when tasks are done not when started. + """ + executor = ThreadPoolExecutor() + fast_futures = [] + slow_futures = [] + + # Print number of fast, slow, and total tasks + print(f"Fast tasks: {self.fast_task_count}") + print(f"Slow tasks: {self.slow_task_count}") + print(f"Total tasks: {self.total_tasks}") + with patch_stdout(): + with ProgressBar( + title="Forge", + formatters=custom_formatters, + style=style, + bottom_toolbar=get_toolbar, + key_bindings=self.kb, + ) as pb: + task_progress = pb(range(self.total_tasks), label="Tasks") + slow_progress = pb(range(self.slow_task_count), label="Slow tasks") + for _ in range(self.slow_task_count): + fast_futures.append(executor.submit(self.slow_task)) + fast_futures[-1].add_done_callback(self.save_data) + + fast_progress = pb(range(self.fast_task_count), label="Fast tasks") + for _ in range(self.fast_task_count): + slow_futures.append(executor.submit(self.fast_task)) + slow_futures[-1].add_done_callback(self.send_data) + while not (fast_progress.done and slow_progress.done): + time.sleep(0.1) + with self.lock: + slow_progress.items_completed = self.slow_tasks_done + fast_progress.items_completed = self.fast_tasks_done + task_progress.items_completed = self.slow_tasks_done + self.fast_tasks_done + if self.fast_tasks_done == self.fast_task_count: + fast_progress.done = True + if self.slow_tasks_done == self.slow_task_count: + slow_progress.done = True + + executor.shutdown() + + result = confirm("Do you want to print the data?") + + if result: + with open("data.txt", "r") as file: + print(file.read()) + +def main(): + forge = Forge() + forge.run() + +if __name__ == "__main__": + main() diff --git a/cli/progress_bar_async.py b/cli/progress_bar_async.py new file mode 100755 index 0000000..49abf84 --- /dev/null +++ b/cli/progress_bar_async.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python +""" progress_bar.py +This module demonstrates how to use the prompt_toolkit ProgressBar to display +the progress of a list of tasks. The progress bar is only updated when tasks +are done not when started. + +Tasks use asyncio for concurrency. +""" +import asyncio +import os +import signal +from random import randint +from threading import Lock + +import aiohttp +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.patch_stdout import patch_stdout +from prompt_toolkit.shortcuts import ProgressBar + +from in_put import confirm_async as confirm +from in_put import custom_formatters, get_toolbar, style + + +class Forge: + """ Forge class + This class demonstrates how to use the prompt_toolkit ProgressBar to display + the progress of a list of tasks. The progress bar is only updated when tasks + are done not when started. + """ + kb = KeyBindings() + cancel = False + def __init__(self): + self.lock = Lock() + self.tasks_done = 0 + self.fast_tasks_done = 0 + self.slow_tasks_done = 0 + self.fast_task_count = randint(50, 400) + self.slow_task_count = randint(50, 100) + self.total_tasks = self.fast_task_count + self.slow_task_count + + @kb.add("q") + def _(event): + cancel = True + + @kb.add("f") + def _(event): + print("f") + + @kb.add("x") + def _(event): + os.kill(os.getpid(), signal.SIGINT) + + async def save_data(self, data): + with open("data.txt", "a") as file: + file.write(f"{data}\n") + await asyncio.sleep(randint(5, 15)) + self.slow_tasks_done += 1 + + async def send_data(self, data): + async with aiohttp.ClientSession() as session: + async with session.post("http://localhost:5597/fast", data=data) as result: + if await result.text() == "Data saved!": + self.fast_tasks_done += 1 + + async def fast_task(self): + async with aiohttp.ClientSession() as session: + async with session.get("http://localhost:5597/fast") as result: + return await self.send_data(await result.text()) + + async def slow_task(self): + async with aiohttp.ClientSession() as session: + async with session.get("http://localhost:5597/slow") as result: + return await self.save_data(await result.text()) + + async def run(self): + """ Keeps track of the progress of a list of tasks using a seperate progress bar + for each task. The progress bar is only update when tasks are done not when started. + """ + slow_tasks = [] + fast_tasks = [] + + # Print number of fast, slow, and total tasks + print(f"Fast tasks: {self.fast_task_count}") + print(f"Slow tasks: {self.slow_task_count}") + print(f"Total tasks: {self.total_tasks}") + with patch_stdout(): + with ProgressBar( + title="Forge", + formatters=custom_formatters, + style=style, + bottom_toolbar=get_toolbar, + key_bindings=self.kb, + ) as pb: + task_progress = pb(range(self.total_tasks), label="Tasks") + slow_progress = pb(range(self.slow_task_count), label="Slow tasks") + for _ in range(self.slow_task_count): + slow_tasks.append(asyncio.create_task(self.slow_task())) + + fast_progress = pb(range(self.fast_task_count), label="Fast tasks") + for _ in range(self.fast_task_count): + fast_tasks.append(asyncio.create_task(self.fast_task())) + + while not (fast_progress.done and slow_progress.done): + await asyncio.sleep(0.1) + slow_progress.items_completed = self.slow_tasks_done + fast_progress.items_completed = self.fast_tasks_done + task_progress.items_completed = self.slow_tasks_done + self.fast_tasks_done + if self.fast_tasks_done == self.fast_task_count: + fast_progress.done = True + if self.slow_tasks_done == self.slow_task_count: + slow_progress.done = True + + result = await confirm("Do you want to print the data?") + + if result: + with open("data.txt", "r") as file: + print(file.read()) + + +def main(): + forge = Forge() + asyncio.run(forge.run()) + + +if __name__ == "__main__": + main() diff --git a/cli/rprompt.py b/cli/rprompt.py new file mode 100755 index 0000000..cf7b81d --- /dev/null +++ b/cli/rprompt.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +""" +Example of a right prompt. This is an additional prompt that is displayed on +the right side of the terminal. It will be hidden automatically when the input +is long enough to cover the right side of the terminal. + +This is similar to RPROMPT is Zsh. +""" +import time +from prompt_toolkit import prompt +from prompt_toolkit.formatted_text import ANSI, HTML +from prompt_toolkit.styles import Style + +example_style = Style.from_dict( + { + # The 'rprompt' gets by default the 'rprompt' class. We can use this + # for the styling. + "rprompt": "bg:#D08770 #ffffff", + } +) + + +def get_rprompt_text(): + return [ + ("", " "), + ("underline", f"{time.ctime()}"), + ("", " "), + ] + + +def main(): + # Option 1: pass a string to 'rprompt': + answer = prompt("> ", rprompt=" ", style=example_style) + print(f"You said: {answer}") + + # Option 2: pass HTML: + answer = prompt("> ", rprompt=HTML(" <rprompt> "), style=example_style) + print(f"You said: {answer}") + + # Option 3: pass ANSI: + answer = prompt( + "> ", rprompt=ANSI(" \x1b[4m\x1b[0m "), style=example_style + ) + print(f"You said: {answer}") + + # Option 4: Pass a callable. (This callable can either return plain text, + # an HTML object, an ANSI object or a list of (style, text) + # tuples. + answer = prompt("> ", rprompt=get_rprompt_text, style=example_style, refresh_interval=1) + print(f"You said: {answer}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/isomorphic_strings.py b/isomorphic_strings.py new file mode 100644 index 0000000..ccc947d --- /dev/null +++ b/isomorphic_strings.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +from collections import Counter + +def isIsometric(s, t): + if len(s) != len(t): + return False + + s_dict = {} + t_dict = {} + + for index in range(len(s)): + if s[index] not in s_dict: + s_dict[s[index]] = t[index] + if t[index] not in t_dict: + t_dict[t[index]] = s[index] + + if s_dict[s[index]] != t[index] or t_dict[t[index]] != s[index]: + return False + return True + + +if __name__ == '__main__': + s = 'egg' + t = 'add' + print(isIsometric(s, t)) # True + + s = 'foo' + t = 'bar' + print(isIsometric(s, t)) # False + + s = 'paper' + t = 'title' + print(isIsometric(s, t)) # True + + s = 'ab' + t = 'aa' + print(isIsometric(s, t)) # False + + s = 'bbbaaaba' + t = 'aaabbbba' + print(isIsometric(s, t)) # False diff --git a/meeting.py b/meeting.py new file mode 100644 index 0000000..6bca713 --- /dev/null +++ b/meeting.py @@ -0,0 +1,14 @@ +from collections import defaultdict +def meeting(s): + result = [tuple(word.split(":")) for word in s.split(";")] + reorder = sorted([(tup[1].upper(),tup[0].upper()) for tup in result]) + names = defaultdict(lambda:[]) + for tup in reorder: + names[tup[0]].append(tup) + names_list = [] + for _, value in names.items(): + names_list.extend(sorted(value, key=lambda tup: tup[0])) + get_string = [f"({tup[0]}, {tup[1]})" for tup in names_list] + return "".join(get_string) + +print(meeting("Alexis:Wahl;John:Bell;Victoria:Schwarz;Abba:Dorny;Grace:Meta;Ann:Arno;Madison:STAN;Alex:Cornwell;Lewis:Kern;Megan:Stan;Alex:Korn")) diff --git a/thread_pool_patterns/submit_use_callback.py b/thread_pool_patterns/submit_use_callback.py index 207b518..76c3e1c 100755 --- a/thread_pool_patterns/submit_use_callback.py +++ b/thread_pool_patterns/submit_use_callback.py @@ -1,10 +1,32 @@ #!/usr/bin/env python3 import timeit +from functools import partial from time import sleep +from typing import List from random import randint +from typing import Dict from concurrent.futures import ThreadPoolExecutor, Future -results: list[int] = [] +from prompt_toolkit.shortcuts import ProgressBar +from pydantic import BaseModel + +results: List[int] = [] + + +class Result(BaseModel): + result: int + + def __repr__(self) -> str: + return f"Result: {self.result} {id(self)}" + + +class Requests: + requests: Dict[str, Result] = {} + odd_requests: Dict[str, Result] = {} + even_requests: Dict[str, Result] = {} + + def __repr__(self) -> str: + return f"Requests: {self.requests}\nOdd requests: {self.odd_requests}\nEven requests: {self.even_requests}" def add_one(number: int) -> int: @@ -18,10 +40,11 @@ def aggregate_results(future: Future): def first_method(): - with ThreadPoolExecutor(32) as executor: - futures = [executor.submit(add_one, number) for number in range(10)] - for future in futures: - future.add_done_callback(aggregate_results) + with ProgressBar() as pb: + with ThreadPoolExecutor(32) as executor: + futures = [executor.submit(add_one, number) for number in range(10)] + for future in pb(futures, label="Processing tasks..."): + future.add_done_callback(aggregate_results) def second_method(): @@ -32,6 +55,42 @@ def second_method(): futures[-1].add_done_callback(aggregate_results) +def add_one_result(result: Result) -> Result: + result.result += 1 + return result + + +def add_one_fut(future: Future): + result = future.result() + if result.result % 2 == 0: + Requests.even_requests[str(result.result)] = result + else: + Requests.odd_requests[str(result.result)] = result + + +def third_method(): + for number in range(10): + result = Result(result=randint(0, 100)) + Requests.requests[str(number)] = result + if result.result % 2 == 0: + Requests.even_requests[str(result.result)] = result + else: + Requests.odd_requests[str(result.result)] = result + + futures = [] + with ThreadPoolExecutor(32) as executor: + for _, request in Requests.requests.items(): + future = executor.submit(add_one_result, request) + future.add_done_callback(add_one_fut) + futures.append(future) + + print("Done waiting!") + print(Requests.requests) + print("odd:", Requests.odd_requests) + print("even:", Requests.even_requests) + print("All done!") + + def main(): print(timeit.timeit(first_method, number=1)) print(results) @@ -41,6 +100,8 @@ def main(): print(results) print("All done!") + third_method() + if __name__ == "__main__": main()