]> code.communitydata.science - rises_declines_wikia_code.git/blob - mediawiki_dump_tools/Mediawiki-Utilities/mw/xml_dump/functions.py
add copy of the GPL
[rises_declines_wikia_code.git] / mediawiki_dump_tools / Mediawiki-Utilities / mw / xml_dump / functions.py
1 import os
2 import re
3 import subprocess
4
5 from .errors import FileTypeError
6
7 EXTENSIONS = {
8     'xml': ["cat"],
9     'gz': ["zcat"],
10     'bz2': ["bzcat"],
11     '7z': ["7z", "e", "-so"],
12     'lzma': ["lzcat"]
13 }
14 """
15 A map from file extension to the command to run to extract the data to standard out.
16 """
17
18 EXT_RE = re.compile(r'\.([^\.]+)$')
19 """
20 A regular expression for extracting the final extension of a file.
21 """
22
23
24 def file(path_or_f):
25     """
26     Verifies that a file exists at a given path and that the file has a
27     known extension type.
28
29     :Parameters:
30         path : `str`
31             the path to a dump file
32
33     """
34     if hasattr(path_or_f, "readline"):
35         return path_or_f
36     else:
37         path = path_or_f
38
39     path = os.path.expanduser(path)
40     if not os.path.isfile(path):
41         raise FileTypeError("Can't find file %s" % path)
42
43     match = EXT_RE.search(path)
44     if match is None:
45         raise FileTypeError("No extension found for %s." % path)
46     elif match.groups()[0] not in EXTENSIONS:
47         raise FileTypeError("File type %r is not supported." % path)
48     else:
49         return path
50
51
52 def open_file(path_or_f):
53     """
54     Turns a path to a dump file into a file-like object of (decompressed)
55     XML data.
56
57     :Parameters:
58         path : `str`
59             the path to the dump file to read
60     """
61     if hasattr(path_or_f, "read"):
62         return path_or_f
63     else:
64         path = path_or_f
65
66     match = EXT_RE.search(path)
67     ext = match.groups()[0]
68     p = subprocess.Popen(
69         EXTENSIONS[ext] + [path],
70         stdout=subprocess.PIPE,
71         stderr=open(os.devnull, "w")
72     )
73     # sys.stderr.write("\n%s %s\n" % (EXTENSIONS[ext], path))
74     # sys.stderr.write(p.stdout.read(1000))
75     # return False
76     return p.stdout

Community Data Science Collective || Want to submit a patch?