2 from multiprocessing import Queue, Value, cpu_count
3 from queue import Empty
5 from .functions import file
6 from .processor import DONE, Processor
8 logger = logging.getLogger("mw.xml_dump.map")
11 def re_raise(error, path):
16 def map(paths, process_dump, handle_error=re_raise,
17 threads=cpu_count(), output_buffer=100):
19 Maps a function across a set of dump files and returns
20 an (order not guaranteed) iterator over the output.
22 The `process_dump` function must return an iterable object (such as a
23 generator). If your process_dump function does not need to produce
24 output, make it return an empty `iterable` upon completion (like an empty
29 a list of paths to dump files to process
30 process_dump : function( dump : :class:`~mw.xml_dump.Iterator`, path : str)
31 a function to run on every :class:`~mw.xml_dump.Iterator`
33 the number of individual processing threads to spool up
35 the maximum number of output values to buffer.
38 An iterator over values yielded by calls to `process_dump()`
40 .. code-block:: python
42 from mw import xml_dump
44 files = ["examples/dump.xml", "examples/dump2.xml"]
46 def page_info(dump, path):
49 yield page.id, page.namespace, page.title
52 for page_id, page_namespace, page_title in xml_dump.map(files, page_info):
53 print("\t".join([str(page_id), str(page_namespace), page_title]))
56 pathsq = queue_files(paths)
57 outputq = Queue(maxsize=output_buffer)
58 running = Value('i', 0)
59 threads = max(1, min(int(threads), pathsq.qsize()))
63 for i in range(0, threads):
64 processor = Processor(
70 processors.append(processor)
72 # output while processes are running
74 while done < len(paths):
76 error, item = outputq.get(timeout=.25)
89 def queue_files(paths):
91 Produces a `multiprocessing.Queue` containing path for each value in
92 `paths` to be used by the `Processor`s.
96 the paths to add to the processing queue