diff --git a/src/webqueue2api/parser/queue.py b/src/webqueue2api/parser/queue.py index 132d75f..4a54dfa 100644 --- a/src/webqueue2api/parser/queue.py +++ b/src/webqueue2api/parser/queue.py @@ -1,3 +1,5 @@ +import multiprocessing +import multiprocessing.pool import os, re from pathlib import Path from .item import Item @@ -77,6 +79,8 @@ def __get_items(self, headers_only: bool) -> list: list: a list of items for this Queue """ items = [] + valid_items = [] + multi_item_processes = multiprocessing.Pool(processes=multiprocessing.cpu_count()) for item in os.listdir(self.path): item_path = Path(self.path, item) @@ -84,7 +88,11 @@ def __get_items(self, headers_only: bool) -> list: is_file = True if os.path.isfile(item_path) else False if is_file and is_valid_item_name(item): - items.append(Item(self.name, item, headers_only)) + valid_items.append(item) + + items = multi_item_processes.starmap_async(Item, [(self.name, item, headers_only) for item in valid_items]).get() + multi_item_processes.close() + multi_item_processes.join() return items @@ -181,15 +189,33 @@ def get_queue_counts() -> list: return sorted_queue_info -def load_queues() -> list: - """Return a list of Queues for each queue. +def load_queues(*queues: tuple, headers_only: bool = True) -> list: + """Returns a list of queues. + + Args: + headers_only (bool, optional): Weather or not the content of items in the queue should be loaded. Defaults to True. + *queues (tuple): List of strings that represent Queue names. Returns: - list: list of Queues for each queue. + list: A list of all the queues that were given as arguments, or all of the queues if no queues were specified. """ - queues = [] - - for queue in get_valid_queues(): - queues.append(Queue(queue)) + # Custom mutliprocessing classes to allow for nested multi-threading. + class NoDaemonProcess(multiprocessing.Process): + def _get_daemon(self): + return False + def _set_daemon(self, value): + pass + daemon = property(_get_daemon, _set_daemon) + class MyPool(multiprocessing.pool.Pool): + Process = NoDaemonProcess + + if len(queues) == 0: queues_to_load = get_valid_queues() + elif len(queues) == 1: return [Queue(name=queues[0], headers_only=headers_only)] + else: queues_to_load = queues + + multi_queue_process = MyPool(processes=multiprocessing.cpu_count()) + loaded_queues = multi_queue_process.starmap_async(Queue, [(queue, headers_only) for queue in queues_to_load]).get() + multi_queue_process.close() + multi_queue_process.join() - return queues \ No newline at end of file + return loaded_queues \ No newline at end of file