Skip to content

Commit

Permalink
Merge pull request #45 from ECN/feature-implement-multiparsing-when-l…
Browse files Browse the repository at this point in the history
…oading-multiple-queues-and-items

Add support for multi-threadded Queue loading
  • Loading branch information
campb303 authored Aug 2, 2021
2 parents 5c10453 + 49ad6e8 commit c88dc30
Showing 1 changed file with 35 additions and 9 deletions.
44 changes: 35 additions & 9 deletions src/webqueue2api/parser/queue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import multiprocessing
import multiprocessing.pool
import os, re
from pathlib import Path
from .item import Item
Expand Down Expand Up @@ -77,14 +79,20 @@ def __get_items(self, headers_only: bool) -> list:
list: a list of items for this Queue
"""
items = []
valid_items = []
multi_item_processes = multiprocessing.Pool(processes=multiprocessing.cpu_count())

for item in os.listdir(self.path):
item_path = Path(self.path, item)

is_file = True if os.path.isfile(item_path) else False

if is_file and is_valid_item_name(item):
items.append(Item(self.name, item, headers_only))
valid_items.append(item)

items = multi_item_processes.starmap_async(Item, [(self.name, item, headers_only) for item in valid_items]).get()
multi_item_processes.close()
multi_item_processes.join()

return items

Expand Down Expand Up @@ -181,15 +189,33 @@ def get_queue_counts() -> list:

return sorted_queue_info

def load_queues() -> list:
"""Return a list of Queues for each queue.
def load_queues(*queues: tuple, headers_only: bool = True) -> list:
"""Returns a list of queues.
Args:
headers_only (bool, optional): Weather or not the content of items in the queue should be loaded. Defaults to True.
*queues (tuple): List of strings that represent Queue names.
Returns:
list: list of Queues for each queue.
list: A list of all the queues that were given as arguments, or all of the queues if no queues were specified.
"""
queues = []

for queue in get_valid_queues():
queues.append(Queue(queue))
# Custom mutliprocessing classes to allow for nested multi-threading.
class NoDaemonProcess(multiprocessing.Process):
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
class MyPool(multiprocessing.pool.Pool):
Process = NoDaemonProcess

if len(queues) == 0: queues_to_load = get_valid_queues()
elif len(queues) == 1: return [Queue(name=queues[0], headers_only=headers_only)]
else: queues_to_load = queues

multi_queue_process = MyPool(processes=multiprocessing.cpu_count())
loaded_queues = multi_queue_process.starmap_async(Queue, [(queue, headers_only) for queue in queues_to_load]).get()
multi_queue_process.close()
multi_queue_process.join()

return queues
return loaded_queues

0 comments on commit c88dc30

Please sign in to comment.