From 90ba300405209f452dec8c2125c8df93af3a1463 Mon Sep 17 00:00:00 2001 From: benne238 Date: Mon, 12 Jul 2021 09:38:05 -0400 Subject: [PATCH 1/7] import multiprocessing package in queue.py --- src/webqueue2api/parser/queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/webqueue2api/parser/queue.py b/src/webqueue2api/parser/queue.py index 36c22eb..ff5e0cc 100644 --- a/src/webqueue2api/parser/queue.py +++ b/src/webqueue2api/parser/queue.py @@ -1,3 +1,4 @@ +import multiprocessing import os, re from pathlib import Path from .item import Item From d8e722bb9aeefa9066db1661f01789739093116f Mon Sep 17 00:00:00 2001 From: benne238 Date: Mon, 12 Jul 2021 09:42:07 -0400 Subject: [PATCH 2/7] Added logic to parse multiple items at once in a given queue using the starmap_async function in the multiprocessing package --- src/webqueue2api/parser/queue.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/webqueue2api/parser/queue.py b/src/webqueue2api/parser/queue.py index ff5e0cc..79eff5e 100644 --- a/src/webqueue2api/parser/queue.py +++ b/src/webqueue2api/parser/queue.py @@ -74,6 +74,8 @@ def __get_items(self, headers_only: bool) -> list: list: a list of items for this Queue """ items = [] + valid_items = [] + multi_item_processes = multiprocessing.Pool(processes=multiprocessing.cpu_count()) for item in os.listdir(self.path): item_path = Path(self.path, item) @@ -81,7 +83,9 @@ def __get_items(self, headers_only: bool) -> list: is_file = True if os.path.isfile(item_path) else False if is_file and is_valid_item_name(item): - items.append(Item(self.name, item, headers_only)) + valid_items.append(item) + + items = multi_item_processes.starmap_async(Item, [(self.name, item, headers_only) for item in valid_items]).get() return items From 5a30f2dd4b072a833662f279705946810c4eb492 Mon Sep 17 00:00:00 2001 From: benne238 Date: Mon, 12 Jul 2021 09:43:21 -0400 Subject: [PATCH 3/7] Added code to wait until all of the items are loaded before returning the list of items --- src/webqueue2api/parser/queue.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/webqueue2api/parser/queue.py b/src/webqueue2api/parser/queue.py index 79eff5e..60fece4 100644 --- a/src/webqueue2api/parser/queue.py +++ b/src/webqueue2api/parser/queue.py @@ -86,6 +86,8 @@ def __get_items(self, headers_only: bool) -> list: valid_items.append(item) items = multi_item_processes.starmap_async(Item, [(self.name, item, headers_only) for item in valid_items]).get() + multi_item_processes.close() + multi_item_processes.join() return items From 55cb9ad3ba510ba26e711f844d42ed973163bb97 Mon Sep 17 00:00:00 2001 From: benne238 Date: Mon, 12 Jul 2021 09:57:36 -0400 Subject: [PATCH 4/7] Modified load_queues function in queue.py to accept *args (strings that represent the different queues) and a headers_only boolean value. Modified the docstring accordingly --- src/webqueue2api/parser/queue.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/webqueue2api/parser/queue.py b/src/webqueue2api/parser/queue.py index 60fece4..02a6d90 100644 --- a/src/webqueue2api/parser/queue.py +++ b/src/webqueue2api/parser/queue.py @@ -185,11 +185,18 @@ def get_queue_counts() -> list: return sorted_queue_info -def load_queues() -> list: - """Return a list of Queues for each queue. +def load_queues(*queues, headers_only: bool = True) -> list: + """Returns a list of queues + + Example: + [example] + + Args: + headers_only (bool, optional): Weather or not the content of items in the queue should be loaded. Defaults to True. + *queues: List of strings that represent Queue names. Returns: - list: list of Queues for each queue. + list: A list of all the queues that were given as arguments, or all of the queues if no queues were specified """ queues = [] From 81e28e5b4c2a8718db6d27476a0f1f25afb25147 Mon Sep 17 00:00:00 2001 From: benne238 Date: Mon, 12 Jul 2021 10:05:50 -0400 Subject: [PATCH 5/7] created a custom class that allows subprocesses to spawn other subprocesses --- src/webqueue2api/parser/queue.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/webqueue2api/parser/queue.py b/src/webqueue2api/parser/queue.py index 02a6d90..22b96be 100644 --- a/src/webqueue2api/parser/queue.py +++ b/src/webqueue2api/parser/queue.py @@ -1,4 +1,5 @@ import multiprocessing +import multiprocessing.pool import os, re from pathlib import Path from .item import Item @@ -198,6 +199,20 @@ def load_queues(*queues, headers_only: bool = True) -> list: Returns: list: A list of all the queues that were given as arguments, or all of the queues if no queues were specified """ + # custom class creation based on stackoverflow answer + class NoDaemonProcess(multiprocessing.Process): + # make 'daemon' attribute always return False + def _get_daemon(self): + return False + def _set_daemon(self, value): + pass + daemon = property(_get_daemon, _set_daemon) + + # We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool + # because the latter is only a wrapper function, not a proper class. + class MyPool(multiprocessing.pool.Pool): + Process = NoDaemonProcess + queues = [] for queue in get_valid_queues(): From ab565ec2ad42f2d8cc80dc001d70b9a41168cd36 Mon Sep 17 00:00:00 2001 From: benne238 Date: Mon, 12 Jul 2021 10:14:11 -0400 Subject: [PATCH 6/7] added logic to parse multiple queues with multiparsing using the the custom class from the previous comitt that allows subprocesses to spawn other subprocesses --- src/webqueue2api/parser/queue.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/webqueue2api/parser/queue.py b/src/webqueue2api/parser/queue.py index 22b96be..1ab1d6c 100644 --- a/src/webqueue2api/parser/queue.py +++ b/src/webqueue2api/parser/queue.py @@ -213,9 +213,13 @@ def _set_daemon(self, value): class MyPool(multiprocessing.pool.Pool): Process = NoDaemonProcess - queues = [] - - for queue in get_valid_queues(): - queues.append(Queue(queue)) + if len(queues) == 0: queues_to_load = get_valid_queues() + elif len(queues) == 1: return [Queue(name=queues[0], headers_only=headers_only)] + else: queues_to_load = queues + + multi_queue_process = MyPool(processes=multiprocessing.cpu_count()) + loaded_queues = multi_queue_process.starmap_async(Queue, [(queue, headers_only) for queue in queues_to_load]).get() + multi_queue_process.close() + multi_queue_process.join() - return queues \ No newline at end of file + return loaded_queues \ No newline at end of file From 49ad6e8e838afe1fb906b1d27c24ee3193777686 Mon Sep 17 00:00:00 2001 From: Justin Campbell Date: Mon, 2 Aug 2021 18:19:15 -0400 Subject: [PATCH 7/7] Docs cleanup --- src/webqueue2api/parser/queue.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/src/webqueue2api/parser/queue.py b/src/webqueue2api/parser/queue.py index 1ab1d6c..ceb1eb0 100644 --- a/src/webqueue2api/parser/queue.py +++ b/src/webqueue2api/parser/queue.py @@ -186,30 +186,23 @@ def get_queue_counts() -> list: return sorted_queue_info -def load_queues(*queues, headers_only: bool = True) -> list: - """Returns a list of queues - - Example: - [example] +def load_queues(*queues: tuple, headers_only: bool = True) -> list: + """Returns a list of queues. Args: headers_only (bool, optional): Weather or not the content of items in the queue should be loaded. Defaults to True. - *queues: List of strings that represent Queue names. + *queues (tuple): List of strings that represent Queue names. Returns: - list: A list of all the queues that were given as arguments, or all of the queues if no queues were specified + list: A list of all the queues that were given as arguments, or all of the queues if no queues were specified. """ - # custom class creation based on stackoverflow answer + # Custom mutliprocessing classes to allow for nested multi-threading. class NoDaemonProcess(multiprocessing.Process): - # make 'daemon' attribute always return False def _get_daemon(self): return False def _set_daemon(self, value): pass daemon = property(_get_daemon, _set_daemon) - - # We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool - # because the latter is only a wrapper function, not a proper class. class MyPool(multiprocessing.pool.Pool): Process = NoDaemonProcess