130 lines
4.4 KiB
Python
130 lines
4.4 KiB
Python
|
#!/usr/bin/env python
|
||
|
""" progress_bar.py
|
||
|
This module demonstrates how to use the prompt_toolkit ProgressBar to display
|
||
|
the progress of a list of tasks. The progress bar is only updated when tasks
|
||
|
are done not when started.
|
||
|
|
||
|
Tasks use ThreadPoolExecutor for concurrency.
|
||
|
"""
|
||
|
import time
|
||
|
import os
|
||
|
import signal
|
||
|
from concurrent.futures import as_completed, ThreadPoolExecutor
|
||
|
from random import randint
|
||
|
from prompt_toolkit.shortcuts import ProgressBar
|
||
|
from prompt_toolkit.key_binding import KeyBindings
|
||
|
from prompt_toolkit.patch_stdout import patch_stdout
|
||
|
import requests
|
||
|
from threading import Lock
|
||
|
|
||
|
from in_put import confirm, style, custom_formatters, get_toolbar
|
||
|
|
||
|
|
||
|
class Forge:
|
||
|
""" Forge class
|
||
|
This class demonstrates how to use the prompt_toolkit ProgressBar to display
|
||
|
the progress of a list of tasks. The progress bar is only updated when tasks
|
||
|
are done not when started.
|
||
|
"""
|
||
|
kb = KeyBindings()
|
||
|
cancel = False
|
||
|
def __init__(self):
|
||
|
self.lock = Lock()
|
||
|
self.tasks_done = 0
|
||
|
self.fast_tasks_done = 0
|
||
|
self.slow_tasks_done = 0
|
||
|
self.fast_task_count = randint(50, 400)
|
||
|
self.slow_task_count = randint(50, 100)
|
||
|
self.total_tasks = self.fast_task_count + self.slow_task_count
|
||
|
|
||
|
@kb.add("q")
|
||
|
def _(event):
|
||
|
cancel = True
|
||
|
|
||
|
@kb.add("f")
|
||
|
def _(event):
|
||
|
print("f")
|
||
|
|
||
|
@kb.add("x")
|
||
|
def _(event):
|
||
|
os.kill(os.getpid(), signal.SIGINT)
|
||
|
|
||
|
def save_data(self, future):
|
||
|
data = future.result()
|
||
|
with open("data.txt", "a") as file:
|
||
|
file.write(f"{data}\n")
|
||
|
time.sleep(randint(1, 3))
|
||
|
with self.lock:
|
||
|
self.slow_tasks_done += 1
|
||
|
|
||
|
def send_data(self, future):
|
||
|
data = future.result()
|
||
|
result = requests.post("http://localhost:5597/fast", data=data)
|
||
|
if result.text == "Data saved!":
|
||
|
with self.lock:
|
||
|
self.fast_tasks_done += 1
|
||
|
|
||
|
def fast_task(self):
|
||
|
result = requests.get("http://localhost:5597/fast")
|
||
|
return result.text
|
||
|
|
||
|
def slow_task(self):
|
||
|
result = requests.get("http://localhost:5597/slow")
|
||
|
return result.text
|
||
|
|
||
|
def run(self):
|
||
|
""" Keeps track of the progress of a list of tasks using a seperate progress bar
|
||
|
for each task. The progress bar is only update when tasks are done not when started.
|
||
|
"""
|
||
|
executor = ThreadPoolExecutor()
|
||
|
fast_futures = []
|
||
|
slow_futures = []
|
||
|
|
||
|
# Print number of fast, slow, and total tasks
|
||
|
print(f"Fast tasks: {self.fast_task_count}")
|
||
|
print(f"Slow tasks: {self.slow_task_count}")
|
||
|
print(f"Total tasks: {self.total_tasks}")
|
||
|
with patch_stdout():
|
||
|
with ProgressBar(
|
||
|
title="Forge",
|
||
|
formatters=custom_formatters,
|
||
|
style=style,
|
||
|
bottom_toolbar=get_toolbar,
|
||
|
key_bindings=self.kb,
|
||
|
) as pb:
|
||
|
task_progress = pb(range(self.total_tasks), label="Tasks")
|
||
|
slow_progress = pb(range(self.slow_task_count), label="Slow tasks")
|
||
|
for _ in range(self.slow_task_count):
|
||
|
fast_futures.append(executor.submit(self.slow_task))
|
||
|
fast_futures[-1].add_done_callback(self.save_data)
|
||
|
|
||
|
fast_progress = pb(range(self.fast_task_count), label="Fast tasks")
|
||
|
for _ in range(self.fast_task_count):
|
||
|
slow_futures.append(executor.submit(self.fast_task))
|
||
|
slow_futures[-1].add_done_callback(self.send_data)
|
||
|
while not (fast_progress.done and slow_progress.done):
|
||
|
time.sleep(0.1)
|
||
|
with self.lock:
|
||
|
slow_progress.items_completed = self.slow_tasks_done
|
||
|
fast_progress.items_completed = self.fast_tasks_done
|
||
|
task_progress.items_completed = self.slow_tasks_done + self.fast_tasks_done
|
||
|
if self.fast_tasks_done == self.fast_task_count:
|
||
|
fast_progress.done = True
|
||
|
if self.slow_tasks_done == self.slow_task_count:
|
||
|
slow_progress.done = True
|
||
|
|
||
|
executor.shutdown()
|
||
|
|
||
|
result = confirm("Do you want to print the data?")
|
||
|
|
||
|
if result:
|
||
|
with open("data.txt", "r") as file:
|
||
|
print(file.read())
|
||
|
|
||
|
def main():
|
||
|
forge = Forge()
|
||
|
forge.run()
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|