]> code.communitydata.science - mediawiki_dump_tools.git/blob - test/Wikiq_Unit_Test.py
cc27fb52b7037a7aa2de214223cb592db5565ccd
[mediawiki_dump_tools.git] / test / Wikiq_Unit_Test.py
1 import unittest\r
2 import os\r
3 import subprocess\r
4 from shutil import copyfile\r
5 import pandas as pd\r
6 from pandas.util.testing import assert_frame_equal\r
7 from io import StringIO\r
8 \r
9 # with / without pwr DONE\r
10 # with / without url encode DONE\r
11 # with / without collapse user DONE\r
12 # with output to sdtout DONE\r
13 # note that the persistence radius is 7 by default\r
14 # reading various file formats including\r
15 #        7z, gz, bz2, xml  DONE\r
16 # wikia and wikipedia data DONE\r
17 # malformed xmls DONE\r
18 \r
19 class Test_Wikipedia(unittest.TestCase):\r
20     def setUp(self):\r
21         if not os.path.exists("test_output"):\r
22             os.mkdir("test_output")\r
23 \r
24         self.wiki = 'ikwiki-20180301-pages-meta-history'\r
25         self.wikiq_out_name =  self.wiki + ".tsv"\r
26         self.test_output_dir = os.path.join(".", "test_output")\r
27         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)\r
28 \r
29         self.infile = "{0}.xml.bz2".format(self.wiki)    \r
30         self.base_call = "../wikiq {0} -o {1}"\r
31         self.input_dir = "dumps"\r
32         self.input_file = os.path.join(".", self.input_dir,self.infile)\r
33         self.baseline_output_dir = "baseline_output"\r
34 \r
35     def test_WP_url_encode(self):\r
36         test_filename =  "url-encode_" + self.wikiq_out_name\r
37         test_file = os.path.join(self.test_output_dir, test_filename)\r
38         if os.path.exists(test_file):\r
39             os.remove(test_file)\r
40         \r
41         call = self.base_call.format(self.input_file, self.test_output_dir)\r
42         call = call + " --url-encode"\r
43         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)\r
44         proc.wait()\r
45 \r
46         copyfile(self.call_output, test_file)\r
47         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)\r
48 \r
49         # as a test let's make sure that we get equal data frames\r
50         test = pd.read_table(test_file)\r
51         baseline = pd.read_table(baseline_file)\r
52         assert_frame_equal(test,baseline)\r
53 \r
54     def test_WP_namespaces(self):\r
55         print(os.path.abspath('.'))\r
56         test_filename =  "namespaces_" + self.wikiq_out_name\r
57         test_file = os.path.join(self.test_output_dir, test_filename)\r
58         if os.path.exists(test_file):\r
59             os.remove(test_file)\r
60         \r
61         call = self.base_call.format(self.input_file, self.test_output_dir)\r
62         call = call + " -n 0 -n 1"\r
63         print(call)\r
64         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)\r
65         proc.wait()\r
66         copyfile(self.call_output, test_file)\r
67         baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)\r
68 \r
69         # as a test let's make sure that we get equal data frames\r
70         test = pd.read_table(test_file)\r
71         num_wrong_ns = sum(~ test.namespace.isin({0,1}))\r
72         self.assertEqual(num_wrong_ns, 0)\r
73         baseline = pd.read_table(baseline_file)\r
74         assert_frame_equal(test,baseline)\r
75 \r
76     def test_WP_revert_radius(self):\r
77         print(os.path.abspath('.'))\r
78         test_filename =  "revert_radius_" + self.wikiq_out_name\r
79         test_file = os.path.join(self.test_output_dir, test_filename)\r
80         if os.path.exists(test_file):\r
81             os.remove(test_file)\r
82         \r
83         call = self.base_call.format(self.input_file, self.test_output_dir)\r
84         call = call + " -n 0 -n 1 -rr 1"\r
85         print(call)\r
86         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)\r
87         proc.wait()\r
88         copyfile(self.call_output, test_file)\r
89         baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)\r
90 \r
91         # as a test let's make sure that we get equal data frames\r
92         test = pd.read_table(test_file)\r
93         num_wrong_ns = sum(~ test.namespace.isin({0,1}))\r
94         self.assertEqual(num_wrong_ns, 0)\r
95         baseline = pd.read_table(baseline_file)\r
96         assert_frame_equal(test,baseline)\r
97 \r
98 \r
99 \r
100 class Test_Basic(unittest.TestCase):\r
101 \r
102     def setUp(self):\r
103         if not os.path.exists("test_output"):\r
104             os.mkdir("test_output")\r
105 \r
106         self.wiki = 'sailormoon'\r
107         self.wikiq_out_name =  self.wiki + ".tsv"\r
108         self.test_output_dir = os.path.join(".", "test_output")\r
109         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)\r
110 \r
111         self.infile = "{0}.xml.7z".format(self.wiki)\r
112         self.base_call = "../wikiq {0} -o {1}"\r
113         self.input_dir = "dumps"\r
114         self.input_file = os.path.join(".", self.input_dir,self.infile)\r
115         self.baseline_output_dir = "baseline_output"\r
116 \r
117     def test_noargs(self):\r
118 \r
119         test_filename =  "noargs_" + self.wikiq_out_name\r
120         test_file = os.path.join(self.test_output_dir, test_filename)\r
121         if os.path.exists(test_file):\r
122             os.remove(test_file)\r
123         \r
124         call = self.base_call.format(self.input_file, self.test_output_dir)\r
125         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)\r
126         proc.wait()\r
127 \r
128         copyfile(self.call_output, test_file)\r
129 \r
130         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)\r
131 \r
132         test = pd.read_table(test_file)\r
133         baseline = pd.read_table(baseline_file)\r
134         assert_frame_equal(test,baseline)\r
135 \r
136 \r
137     def test_collapse_user(self):\r
138         test_filename =  "collapse-user_" + self.wikiq_out_name\r
139         test_file = os.path.join(self.test_output_dir, test_filename)\r
140         if os.path.exists(test_file):\r
141             os.remove(test_file)\r
142         \r
143         call = self.base_call.format(self.input_file, self.test_output_dir)\r
144         call = call + " --collapse-user"\r
145 \r
146         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)\r
147         proc.wait()\r
148 \r
149         copyfile(self.call_output, test_file)\r
150 \r
151         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)\r
152         test = pd.read_table(test_file)\r
153         baseline = pd.read_table(baseline_file)\r
154         assert_frame_equal(test,baseline)\r
155 \r
156     def test_pwr_segment(self):\r
157         test_filename =  "persistence_segment_" + self.wikiq_out_name\r
158         test_file = os.path.join(self.test_output_dir, test_filename)\r
159         if os.path.exists(test_file):\r
160             os.remove(test_file)\r
161         \r
162         call = self.base_call.format(self.input_file, self.test_output_dir)\r
163         call = call + " --persistence segment"\r
164         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)\r
165         proc.wait()\r
166 \r
167 \r
168         copyfile(self.call_output, test_file)\r
169 \r
170         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)\r
171 \r
172         test = pd.read_table(test_file)\r
173         baseline = pd.read_table(baseline_file)\r
174         assert_frame_equal(test,baseline)\r
175 \r
176     def test_pwr_legacy(self):\r
177         test_filename =  "persistence_legacy_" + self.wikiq_out_name\r
178         test_file = os.path.join(self.test_output_dir, test_filename)\r
179         if os.path.exists(test_file):\r
180             os.remove(test_file)\r
181         \r
182         call = self.base_call.format(self.input_file, self.test_output_dir)\r
183         call = call + " --persistence legacy"\r
184         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)\r
185         proc.wait()\r
186 \r
187 \r
188         copyfile(self.call_output, test_file)\r
189 \r
190         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)\r
191 \r
192         test = pd.read_table(test_file)\r
193         baseline = pd.read_table(baseline_file)\r
194         assert_frame_equal(test,baseline)\r
195 \r
196     def test_pwr(self):\r
197         test_filename =  "persistence_" + self.wikiq_out_name\r
198         test_file = os.path.join(self.test_output_dir, test_filename)\r
199         if os.path.exists(test_file): \r
200            os.remove(test_file)\r
201         \r
202         call = self.base_call.format(self.input_file, self.test_output_dir)\r
203         call = call + " --persistence"\r
204         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)\r
205         proc.wait()\r
206 \r
207 \r
208         copyfile(self.call_output, test_file)\r
209 \r
210         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)\r
211 \r
212         test = pd.read_table(test_file)\r
213         baseline = pd.read_table(baseline_file)\r
214         assert_frame_equal(test,baseline)\r
215 \r
216 \r
217     def test_url_encode(self):\r
218         test_filename =  "url-encode_" + self.wikiq_out_name\r
219 \r
220         test_file = os.path.join(self.test_output_dir, test_filename)\r
221         if os.path.exists(test_file):\r
222             os.remove(test_file)\r
223         \r
224         call = self.base_call.format(self.input_file, self.test_output_dir)\r
225         call = call + " --url-encode"\r
226         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)\r
227         proc.wait()\r
228 \r
229         copyfile(self.call_output, test_file)\r
230         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)\r
231         test = pd.read_table(test_file)\r
232         baseline = pd.read_table(baseline_file)\r
233         assert_frame_equal(test,baseline)\r
234 \r
235 \r
236 class Test_Malformed(unittest.TestCase):\r
237     def setUp(self):\r
238         if not os.path.exists("test_output"):\r
239             os.mkdir("test_output")\r
240 \r
241         self.wiki = 'twinpeaks'\r
242         self.wikiq_out_name =  self.wiki + ".tsv"\r
243         self.test_output_dir = os.path.join(".", "test_output")\r
244         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)\r
245 \r
246         self.infile = "{0}.xml.7z".format(self.wiki)\r
247         self.base_call = "../wikiq {0} -o {1}"\r
248         self.input_dir = "dumps"\r
249         self.input_file = os.path.join(".", self.input_dir,self.infile)\r
250 \r
251 \r
252     def test_malformed_noargs(self):\r
253 \r
254         call = self.base_call.format(self.input_file, self.test_output_dir)\r
255         proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)\r
256         proc.wait()\r
257         outs, errs = proc.communicate()\r
258         errlines = str(errs).split("\\n")\r
259         self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')\r
260 \r
261 class Test_Stdout(unittest.TestCase):\r
262 \r
263     def setUp(self):\r
264         self.wiki = 'sailormoon'\r
265         self.wikiq_out_name =  self.wiki + ".tsv"\r
266 \r
267         self.infile = "{0}.xml.7z".format(self.wiki)\r
268         self.base_call = "../wikiq {0} --stdout"\r
269         self.input_dir = "dumps"\r
270         self.input_file = os.path.join(".", self.input_dir,self.infile)\r
271         self.baseline_output_dir = "baseline_output"\r
272 \r
273     def test_noargs(self):\r
274 \r
275         call = self.base_call.format(self.input_file)\r
276         proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)\r
277         outs = proc.stdout.decode("utf8")\r
278 \r
279         test_file = "noargs_" + self.wikiq_out_name\r
280         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)\r
281         print(baseline_file)\r
282         test = pd.read_table(StringIO(outs))\r
283         baseline = pd.read_table(baseline_file)\r
284         assert_frame_equal(test,baseline)\r
285 \r
286 class Test_Regex(unittest.TestCase):\r
287 \r
288     def setUp(self):\r
289         self.wiki = 'emptytext'\r
290         self.wikiq_out_name = self.wiki + '.tsv'\r
291         self.infile = "{0}.xml.bz2".format(self.wiki)\r
292 \r
293         self.input_dir = "dumps"\r
294         self.input_file = os.path.join(".", self.input_dir,self.infile)\r
295 \r
296         if not os.path.exists("test_output"):\r
297             os.mkdir("test_output")\r
298 \r
299         self.test_output_dir = os.path.join(".", "test_output")\r
300         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)\r
301         # we have two base calls, one for checking inputs and the other for checking outputs\r
302         self.base_call = "../wikiq {0}"\r
303         self.base_call_outs = "../wikiq {0} -o {1}"\r
304 \r
305         self.baseline_output_dir = "baseline_output"\r
306 \r
307         # sample inputs for checking that bad inputs get terminated / test_regex_inputs\r
308         self.bad_inputs_list = [\r
309             #label is missing            \r
310             "-RP '\\b\\d+\\b'", \r
311             #number of reg and number of labels do not match \r
312             "-RP 'NPO V' -RP THE -RPl testlabel",\r
313             #cp but rp label\r
314             "-CP '(Tamil|Li)' -RPl testlabel",\r
315             #regex is missing\r
316             "-CPl testlabel",\r
317             "-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'"\r
318         ]\r
319 \r
320         # sample inputs for checking the outcomes of good inputs / test_basic_regex\r
321         self.good_inputs_list = [\r
322             "-RP '\\b\\d{3}\\b' -RPl threedigits",\r
323             "-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word",\r
324             "-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning",\r
325             "-CP 'WP:EVADE' -CPl wp_evade"         \r
326         ]\r
327 \r
328         \r
329         self.cap_inputs_list = [\r
330             "-RP 'Li Chevalier' -RPl li_cheval -CP '(?P<letter>\\b[a-zA-Z]{3}\\b)|(?P<number>\\b\\d+\\b)|(?P<cat>\\bcat\\b)' -CPl three",\r
331             "-CP '(?P<a>\\bTestCaseA\\b)|(?P<b>\\bTestCaseB\\b)|(?P<c>\\bTestCaseC\\b)|(?P<d>\\bTestCaseD\\b)' -CPl testcase -RP '(?P<npov>npov|NPOV)|(?P<neutral>neutral point of view)' -RPl npov"\r
332         ]\r
333 \r
334 \r
335 \r
336     def test_regex_inputs(self):\r
337         for input in self.bad_inputs_list:\r
338             call = self.base_call.format(self.input_file)\r
339             call = call + " --stdout " + input\r
340             #print(call)\r
341             proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)\r
342             stdout,stderr = proc.communicate()\r
343             #print(proc.returncode)\r
344             \r
345             # we want to check that the bad inputs were caught and sys.exit is stopping the code\r
346             #print(stderr.decode("utf-8"))\r
347             self.assertNotEqual(proc.returncode,0)\r
348 \r
349     def test_basic_regex(self):\r
350         for i, input in enumerate(self.good_inputs_list):\r
351 \r
352             test_filename = "basic_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))\r
353             #print(test_filename)\r
354             test_file = os.path.join(self.test_output_dir, test_filename)\r
355             if os.path.exists(test_file):\r
356                 os.remove(test_file)\r
357 \r
358             call = self.base_call_outs.format(self.input_file, self.test_output_dir)\r
359             call = call + " " + input\r
360             #print(call)\r
361 \r
362             proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)\r
363             proc.wait()\r
364             copyfile(self.call_output, test_file)\r
365 \r
366             test = pd.read_table(test_file)\r
367             \r
368             baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)\r
369             baseline = pd.read_table(baseline_file)\r
370             #assert_frame_equal(test, baseline)\r
371             #print(i)\r
372 \r
373 \r
374     def test_capturegroup_regex(self):\r
375         for i, input in enumerate(self.cap_inputs_list):\r
376             test_filename = "capturegroup_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))\r
377             #print(test_filename)\r
378             test_file = os.path.join(self.test_output_dir, test_filename)\r
379             if os.path.exists(test_file):\r
380                 os.remove(test_file)\r
381 \r
382             call = self.base_call_outs.format(self.input_file, self.test_output_dir)\r
383             call = call + " " + input\r
384             #print(call)\r
385 \r
386             proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)\r
387             proc.wait()\r
388 \r
389             copyfile(self.call_output, test_file)\r
390             \r
391             test = pd.read_table(test_file)\r
392             \r
393             baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)\r
394             baseline = pd.read_table(baseline_file)\r
395             #assert_frame_equal(test, baseline)\r
396 \r
397 \r
398 if __name__ == '__main__':\r
399     unittest.main()\r

Community Data Science Collective || Want to submit a patch?