From d5282858288c649b4181f210548065904fd7018c Mon Sep 17 00:00:00 2001 From: Roland Thomas Date: Sun, 12 May 2024 22:40:34 -0400 Subject: [PATCH] Add thread_pool_patterns --- .../asyncio_with_thread_pool.py | 9 ++-- thread_pool_patterns/map_and_wait.py | 33 ++++++++++++++ .../submit_use_as_completed.py | 22 +++++++++ thread_pool_patterns/submit_use_callback.py | 45 +++++++++++++++++++ .../submit_use_sequentially.py | 22 +++++++++ thread_pool_patterns/submit_wait_first.py | 22 +++++++++ thread_pool_patterns/submit_wait_for_all.py | 39 ++++++++++++++++ 7 files changed, 189 insertions(+), 3 deletions(-) rename thread_pool.py => thread_pool_patterns/asyncio_with_thread_pool.py (81%) create mode 100755 thread_pool_patterns/map_and_wait.py create mode 100755 thread_pool_patterns/submit_use_as_completed.py create mode 100755 thread_pool_patterns/submit_use_callback.py create mode 100755 thread_pool_patterns/submit_use_sequentially.py create mode 100755 thread_pool_patterns/submit_wait_first.py create mode 100755 thread_pool_patterns/submit_wait_for_all.py diff --git a/thread_pool.py b/thread_pool_patterns/asyncio_with_thread_pool.py similarity index 81% rename from thread_pool.py rename to thread_pool_patterns/asyncio_with_thread_pool.py index fe10159..29a1327 100755 --- a/thread_pool.py +++ b/thread_pool_patterns/asyncio_with_thread_pool.py @@ -15,7 +15,8 @@ import time def blocking_function(): - time.sleep(2) + print("Blocking function started!") + time.sleep(5) return "Blocking function completed!" @@ -29,7 +30,7 @@ async def main(): loop = asyncio.get_event_loop() executor = ThreadPoolExecutor() - async_task = asyncio.create_task(async_function()) + async_task = loop.create_task(async_function()) result = await loop.run_in_executor(executor, blocking_function) print(result) @@ -38,4 +39,6 @@ async def main(): if __name__ == "__main__": - asyncio.run(main()) + loop = asyncio.get_event_loop() + main_task = loop.create_task(main()) + loop.run_until_complete(main_task) diff --git a/thread_pool_patterns/map_and_wait.py b/thread_pool_patterns/map_and_wait.py new file mode 100755 index 0000000..d12a05a --- /dev/null +++ b/thread_pool_patterns/map_and_wait.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +from concurrent.futures import ThreadPoolExecutor +from random import randint +from time import sleep + + +def add_one(number): + sleep(randint(0, 2)) + return number + 1 + + +def add_one_print(number): + sleep(randint(0, 2)) + print(number) + return number + 1 + + +def main(): + with ThreadPoolExecutor() as executor: + for result in executor.map(add_one, [1, 2, 3, 4, 5]): + print(result) + print("All done!\n") + + with ThreadPoolExecutor() as executor: + returned_generator = executor.map(add_one_print, [1, 2, 3, 4, 5]) + print("Results in order\n") + for result in returned_generator: + print(result) + print("All done!") + + +if __name__ == "__main__": + main() diff --git a/thread_pool_patterns/submit_use_as_completed.py b/thread_pool_patterns/submit_use_as_completed.py new file mode 100755 index 0000000..e62bf84 --- /dev/null +++ b/thread_pool_patterns/submit_use_as_completed.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +from time import sleep +from random import randint +from concurrent.futures import ThreadPoolExecutor, as_completed + + +def add_one(number): + print(number) + sleep(randint(0,2)) + return number + 1 + + +def main(): + with ThreadPoolExecutor(32) as executor: + futures = [executor.submit(add_one, number) for number in range(10)] + print(futures) + for future in as_completed(futures): + print(future.result()) + + +if __name__ == "__main__": + main() diff --git a/thread_pool_patterns/submit_use_callback.py b/thread_pool_patterns/submit_use_callback.py new file mode 100755 index 0000000..c2278be --- /dev/null +++ b/thread_pool_patterns/submit_use_callback.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +import timeit +from time import sleep +from random import randint +from concurrent.futures import ThreadPoolExecutor, Future + +results: list[int] = [] + +def add_one(number: int) -> int: + sleep(randint(0,2)) + result = number + 1 + return result + + +def aggregate_results(future: Future): + results.append(future.result()) + + +def first_method(): + with ThreadPoolExecutor(32) as executor: + futures = [executor.submit(add_one, number) for number in range(10)] + for future in futures: + future.add_done_callback(aggregate_results) + + +def second_method(): + futures = [] + with ThreadPoolExecutor(32) as executor: + for number in range(10): + futures.append(executor.submit(add_one, number)) + futures[-1].add_done_callback(aggregate_results) + + +def main(): + print(timeit.timeit(first_method, number=1)) + print(results) + print("All done!\n") + results.clear() + print(timeit.timeit(second_method, number=1)) + print(results) + print("All done!") + + +if __name__ == "__main__": + main() diff --git a/thread_pool_patterns/submit_use_sequentially.py b/thread_pool_patterns/submit_use_sequentially.py new file mode 100755 index 0000000..436b240 --- /dev/null +++ b/thread_pool_patterns/submit_use_sequentially.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +from time import sleep +from random import randint +from concurrent.futures import ThreadPoolExecutor + + +def add_one(number): + print(number) + sleep(randint(0,2)) + return number + 1 + + +def main(): + with ThreadPoolExecutor(32) as executor: + futures = [executor.submit(add_one, number) for number in range(10)] + print(futures) + for future in futures: + print(future.result()) + + +if __name__ == "__main__": + main() diff --git a/thread_pool_patterns/submit_wait_first.py b/thread_pool_patterns/submit_wait_first.py new file mode 100755 index 0000000..c8707ca --- /dev/null +++ b/thread_pool_patterns/submit_wait_first.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +from time import sleep +from random import randint +from concurrent.futures import ThreadPoolExecutor, Future, wait, FIRST_COMPLETED + +results: list[int] = [] + +def add_one(number: int) -> int: + sleep(randint(0,2)) + return number + 1 + + +def main() -> None: + executor = ThreadPoolExecutor(32) + futures: list[Future] = [executor.submit(add_one, number) for number in range(10)] + done, not_done = wait(futures, return_when=FIRST_COMPLETED) + executor.shutdown(wait=False, cancel_futures=True) + print(done.pop().result()) + + +if __name__ == "__main__": + main() diff --git a/thread_pool_patterns/submit_wait_for_all.py b/thread_pool_patterns/submit_wait_for_all.py new file mode 100755 index 0000000..1185054 --- /dev/null +++ b/thread_pool_patterns/submit_wait_for_all.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 +from time import sleep +from random import randint +from concurrent.futures import ThreadPoolExecutor, wait, Future + +results: list[int] = [] + +def add_one(number: int) -> int: + print(number) + sleep(randint(0,2)) + return number + 1 + + +def main() -> None: + executor = ThreadPoolExecutor(32) + futures: list[Future] = [executor.submit(add_one, number) for number in range(10)] + wait(futures) + print("Done waiting!") + for future in futures: + print(future.result()) + print("All done!") + + futures: list[Future] = [executor.submit(add_one, number) for number in range(10)] + executor.shutdown() + print("Done waiting!") + for future in futures: + print(future.result()) + print("All done!") + + with ThreadPoolExecutor() as executor: + futures: list[Future] = [executor.submit(add_one, number) for number in range(10)] + print("Done waiting!") + for future in futures: + print(future.result()) + print("All done!") + + +if __name__ == "__main__": + main()