]> code.communitydata.science - rises_declines_wikia_code.git/blob - mediawiki_dump_tools/Mediawiki-Utilities/mw/xml_dump/processor.py
Initial commit
[rises_declines_wikia_code.git] / mediawiki_dump_tools / Mediawiki-Utilities / mw / xml_dump / processor.py
1 import logging
2 import traceback
3 from collections import namedtuple
4 from multiprocessing import Process
5 from queue import Empty
6
7 from .functions import open_file
8 from .iteration import Iterator
9
10 logger = logging.getLogger("mw.dump.processor")
11
12 ErrorItem = namedtuple("ErrorItem", ['error', 'item'])
13
14 class DONE: pass
15
16
17 class Processor(Process):
18     def __init__(self, pathq, outputq, process_dump, logger=logger):
19         self.pathq = pathq
20         self.outputq = outputq
21         self.process_dump = process_dump
22         self.logger = logger
23         Process.__init__(self)
24
25     def run(self):
26         try:
27             while True:
28
29                 # Force the queue to reset & behave reasonably
30                 foo = self.pathq.qsize()
31                 path = self.pathq.get(block=False)
32                 dump = Iterator.from_file(open_file(path))
33                 logger.info("Beginning to process {0}.".format(repr(path)))
34                 try:
35                     for out in self.process_dump(dump, path):
36                         self.outputq.put(ErrorItem(False, out))
37                 except Exception as error:
38
39                     self.outputq.put(ErrorItem(True, (error, path)))
40
41                     logger.error(
42                         "Failed while processing dump " +
43                         "{0}: {1}".format(repr(path),
44                                           "\n" + traceback.format_exc()))
45         except Empty:
46             self.logger.info("Nothing left to do.  Shutting down thread.")
47         finally:
48             self.outputq.put(ErrorItem(False, DONE))

Community Data Science Collective || Want to submit a patch?