3 from collections import namedtuple
4 from multiprocessing import Process
5 from queue import Empty
7 from .functions import open_file
8 from .iteration import Iterator
10 logger = logging.getLogger("mw.dump.processor")
12 ErrorItem = namedtuple("ErrorItem", ['error', 'item'])
17 class Processor(Process):
18 def __init__(self, pathq, outputq, process_dump, logger=logger):
20 self.outputq = outputq
21 self.process_dump = process_dump
23 Process.__init__(self)
29 # Force the queue to reset & behave reasonably
30 foo = self.pathq.qsize()
31 path = self.pathq.get(block=False)
32 dump = Iterator.from_file(open_file(path))
33 logger.info("Beginning to process {0}.".format(repr(path)))
35 for out in self.process_dump(dump, path):
36 self.outputq.put(ErrorItem(False, out))
37 except Exception as error:
39 self.outputq.put(ErrorItem(True, (error, path)))
42 "Failed while processing dump " +
43 "{0}: {1}".format(repr(path),
44 "\n" + traceback.format_exc()))
46 self.logger.info("Nothing left to do. Shutting down thread.")
48 self.outputq.put(ErrorItem(False, DONE))