]> code.communitydata.science - rises_declines_wikia_code.git/blob - mediawiki_dump_tools/Mediawiki-Utilities/mw/xml_dump/map.py
add copy of the GPL
[rises_declines_wikia_code.git] / mediawiki_dump_tools / Mediawiki-Utilities / mw / xml_dump / map.py
1 import logging
2 from multiprocessing import Queue, Value, cpu_count
3 from queue import Empty
4
5 from .functions import file
6 from .processor import DONE, Processor
7
8 logger = logging.getLogger("mw.xml_dump.map")
9
10
11 def re_raise(error, path):
12     raise error
13
14
15
16 def map(paths, process_dump, handle_error=re_raise,
17         threads=cpu_count(), output_buffer=100):
18     """
19     Maps a function across a set of dump files and returns
20     an (order not guaranteed) iterator over the output.
21
22     The `process_dump` function must return an iterable object (such as a
23     generator).  If your process_dump function does not need to produce
24     output, make it return an empty `iterable` upon completion (like an empty
25     list).
26
27     :Parameters:
28         paths : iter( str )
29             a list of paths to dump files to process
30         process_dump : function( dump : :class:`~mw.xml_dump.Iterator`, path : str)
31             a function to run on every :class:`~mw.xml_dump.Iterator`
32         threads : int
33             the number of individual processing threads to spool up
34         output_buffer : int
35             the maximum number of output values to buffer.
36
37     :Returns:
38         An iterator over values yielded by calls to `process_dump()`
39     :Example:
40         .. code-block:: python
41
42             from mw import xml_dump
43
44             files = ["examples/dump.xml", "examples/dump2.xml"]
45
46             def page_info(dump, path):
47                 for page in dump:
48
49                     yield page.id, page.namespace, page.title
50
51
52             for page_id, page_namespace, page_title in xml_dump.map(files, page_info):
53                 print("\t".join([str(page_id), str(page_namespace), page_title]))
54     """
55     paths = list(paths)
56     pathsq = queue_files(paths)
57     outputq = Queue(maxsize=output_buffer)
58     running = Value('i', 0)
59     threads = max(1, min(int(threads), pathsq.qsize()))
60
61     processors = []
62
63     for i in range(0, threads):
64         processor = Processor(
65             pathsq,
66             outputq,
67             process_dump
68         )
69         processor.start()
70         processors.append(processor)
71
72     # output while processes are running
73     done = 0
74     while done < len(paths):
75         try:
76             error, item = outputq.get(timeout=.25)
77         except Empty:
78             continue
79
80         if not error:
81             if item is DONE:
82                 done += 1
83             else:
84                 yield item
85         else:
86             error, path = item
87             re_raise(error, path)
88
89 def queue_files(paths):
90     """
91     Produces a `multiprocessing.Queue` containing path for each value in
92     `paths` to be used by the `Processor`s.
93
94     :Parameters:
95         paths : iterable
96             the paths to add to the processing queue
97     """
98     q = Queue()
99     for path in paths:
100         q.put(file(path))
101     return q

Community Data Science Collective || Want to submit a patch?