]> code.communitydata.science - mediawiki_dump_tools.git/blob - test/Wikiq_Unit_Test.py
remove commented code
[mediawiki_dump_tools.git] / test / Wikiq_Unit_Test.py
1 import unittest
2 import os
3 import subprocess
4 from shutil import copyfile
5 import pandas as pd
6 from pandas.util.testing import assert_frame_equal
7 from io import StringIO
8
9 # with / without pwr DONE
10 # with / without url encode DONE
11 # with / without collapse user DONE
12 # with output to sdtout DONE
13 # note that the persistence radius is 7 by default
14 # reading various file formats including
15 #        7z, gz, bz2, xml  DONE
16 # wikia and wikipedia data DONE
17 # malformed xmls DONE
18
19 class Test_Wikipedia(unittest.TestCase):
20     def setUp(self):
21         if not os.path.exists("test_output"):
22             os.mkdir("test_output")
23
24         self.wiki = 'ikwiki-20180301-pages-meta-history'
25         self.wikiq_out_name =  self.wiki + ".tsv"
26         self.test_output_dir = os.path.join(".", "test_output")
27         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
28
29         self.infile = "{0}.xml.bz2".format(self.wiki)    
30         self.base_call = "../wikiq {0} -o {1}"
31         self.input_dir = "dumps"
32         self.input_file = os.path.join(".", self.input_dir,self.infile)
33         self.baseline_output_dir = "baseline_output"
34
35     def test_WP_url_encode(self):
36         test_filename =  "url-encode_" + self.wikiq_out_name
37         test_file = os.path.join(self.test_output_dir, test_filename)
38         if os.path.exists(test_file):
39             os.remove(test_file)
40         
41         call = self.base_call.format(self.input_file, self.test_output_dir)
42         call = call + " --url-encode"
43         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
44         proc.wait()
45
46         copyfile(self.call_output, test_file)
47         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
48
49         # as a test let's make sure that we get equal data frames
50         test = pd.read_table(test_file)
51         baseline = pd.read_table(baseline_file)
52         assert_frame_equal(test,baseline)
53
54     def test_WP_namespaces(self):
55         print(os.path.abspath('.'))
56         test_filename =  "namespaces_" + self.wikiq_out_name
57         test_file = os.path.join(self.test_output_dir, test_filename)
58         if os.path.exists(test_file):
59             os.remove(test_file)
60         
61         call = self.base_call.format(self.input_file, self.test_output_dir)
62         call = call + " -n 0 -n 1"
63         print(call)
64         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
65         proc.wait()
66         copyfile(self.call_output, test_file)
67         baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
68
69         # as a test let's make sure that we get equal data frames
70         test = pd.read_table(test_file)
71         num_wrong_ns = sum(~ test.namespace.isin({0,1}))
72         self.assertEqual(num_wrong_ns, 0)
73         baseline = pd.read_table(baseline_file)
74         assert_frame_equal(test,baseline)
75
76     def test_WP_revert_radius(self):
77         print(os.path.abspath('.'))
78         test_filename =  "revert_radius_" + self.wikiq_out_name
79         test_file = os.path.join(self.test_output_dir, test_filename)
80         if os.path.exists(test_file):
81             os.remove(test_file)
82         
83         call = self.base_call.format(self.input_file, self.test_output_dir)
84         call = call + " -n 0 -n 1 -rr 1"
85         print(call)
86         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
87         proc.wait()
88         copyfile(self.call_output, test_file)
89         baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
90
91         # as a test let's make sure that we get equal data frames
92         test = pd.read_table(test_file)
93         num_wrong_ns = sum(~ test.namespace.isin({0,1}))
94         self.assertEqual(num_wrong_ns, 0)
95         baseline = pd.read_table(baseline_file)
96         assert_frame_equal(test,baseline)
97
98
99
100 class Test_Basic(unittest.TestCase):
101
102     def setUp(self):
103         if not os.path.exists("test_output"):
104             os.mkdir("test_output")
105
106         self.wiki = 'sailormoon'
107         self.wikiq_out_name =  self.wiki + ".tsv"
108         self.test_output_dir = os.path.join(".", "test_output")
109         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
110
111         self.infile = "{0}.xml.7z".format(self.wiki)
112         self.base_call = "../wikiq {0} -o {1}"
113         self.input_dir = "dumps"
114         self.input_file = os.path.join(".", self.input_dir,self.infile)
115         self.baseline_output_dir = "baseline_output"
116
117     def test_noargs(self):
118
119         test_filename =  "noargs_" + self.wikiq_out_name
120         test_file = os.path.join(self.test_output_dir, test_filename)
121         if os.path.exists(test_file):
122             os.remove(test_file)
123         
124         call = self.base_call.format(self.input_file, self.test_output_dir)
125         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
126         proc.wait()
127
128         copyfile(self.call_output, test_file)
129
130         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
131
132         test = pd.read_table(test_file)
133         baseline = pd.read_table(baseline_file)
134         assert_frame_equal(test,baseline)
135
136
137     def test_collapse_user(self):
138         test_filename =  "collapse-user_" + self.wikiq_out_name
139         test_file = os.path.join(self.test_output_dir, test_filename)
140         if os.path.exists(test_file):
141             os.remove(test_file)
142         
143         call = self.base_call.format(self.input_file, self.test_output_dir)
144         call = call + " --collapse-user"
145
146         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
147         proc.wait()
148
149         copyfile(self.call_output, test_file)
150
151         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
152         test = pd.read_table(test_file)
153         baseline = pd.read_table(baseline_file)
154         assert_frame_equal(test,baseline)
155
156     def test_pwr_segment(self):
157         test_filename =  "persistence_segment_" + self.wikiq_out_name
158         test_file = os.path.join(self.test_output_dir, test_filename)
159         if os.path.exists(test_file):
160             os.remove(test_file)
161         
162         call = self.base_call.format(self.input_file, self.test_output_dir)
163         call = call + " --persistence segment"
164         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
165         proc.wait()
166
167
168         copyfile(self.call_output, test_file)
169
170         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
171
172         test = pd.read_table(test_file)
173         baseline = pd.read_table(baseline_file)
174         assert_frame_equal(test,baseline)
175
176     def test_pwr_legacy(self):
177         test_filename =  "persistence_legacy_" + self.wikiq_out_name
178         test_file = os.path.join(self.test_output_dir, test_filename)
179         if os.path.exists(test_file):
180             os.remove(test_file)
181         
182         call = self.base_call.format(self.input_file, self.test_output_dir)
183         call = call + " --persistence legacy"
184         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
185         proc.wait()
186
187
188         copyfile(self.call_output, test_file)
189
190         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
191
192         test = pd.read_table(test_file)
193         baseline = pd.read_table(baseline_file)
194         assert_frame_equal(test,baseline)
195
196     def test_pwr(self):
197         test_filename =  "persistence_" + self.wikiq_out_name
198         test_file = os.path.join(self.test_output_dir, test_filename)
199         if os.path.exists(test_file): 
200            os.remove(test_file)
201         
202         call = self.base_call.format(self.input_file, self.test_output_dir)
203         call = call + " --persistence"
204         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
205         proc.wait()
206
207
208         copyfile(self.call_output, test_file)
209
210         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
211
212         test = pd.read_table(test_file)
213         baseline = pd.read_table(baseline_file)
214         assert_frame_equal(test,baseline)
215
216
217     def test_url_encode(self):
218         test_filename =  "url-encode_" + self.wikiq_out_name
219
220         test_file = os.path.join(self.test_output_dir, test_filename)
221         if os.path.exists(test_file):
222             os.remove(test_file)
223         
224         call = self.base_call.format(self.input_file, self.test_output_dir)
225         call = call + " --url-encode"
226         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
227         proc.wait()
228
229         copyfile(self.call_output, test_file)
230         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
231         test = pd.read_table(test_file)
232         baseline = pd.read_table(baseline_file)
233         assert_frame_equal(test,baseline)
234
235
236 class Test_Malformed(unittest.TestCase):
237     def setUp(self):
238         if not os.path.exists("test_output"):
239             os.mkdir("test_output")
240
241         self.wiki = 'twinpeaks'
242         self.wikiq_out_name =  self.wiki + ".tsv"
243         self.test_output_dir = os.path.join(".", "test_output")
244         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
245
246         self.infile = "{0}.xml.7z".format(self.wiki)
247         self.base_call = "../wikiq {0} -o {1}"
248         self.input_dir = "dumps"
249         self.input_file = os.path.join(".", self.input_dir,self.infile)
250
251
252     def test_malformed_noargs(self):
253
254         call = self.base_call.format(self.input_file, self.test_output_dir)
255         proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
256         proc.wait()
257         outs, errs = proc.communicate()
258         errlines = str(errs).split("\\n")
259         self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
260
261 class Test_Stdout(unittest.TestCase):
262
263     def setUp(self):
264         self.wiki = 'sailormoon'
265         self.wikiq_out_name =  self.wiki + ".tsv"
266
267         self.infile = "{0}.xml.7z".format(self.wiki)
268         self.base_call = "../wikiq {0} --stdout"
269         self.input_dir = "dumps"
270         self.input_file = os.path.join(".", self.input_dir,self.infile)
271         self.baseline_output_dir = "baseline_output"
272
273     def test_noargs(self):
274
275         call = self.base_call.format(self.input_file)
276         proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
277         outs = proc.stdout.decode("utf8")
278
279         test_file = "noargs_" + self.wikiq_out_name
280         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
281         print(baseline_file)
282         test = pd.read_table(StringIO(outs))
283         baseline = pd.read_table(baseline_file)
284         assert_frame_equal(test,baseline)
285
286 class Test_Regex(unittest.TestCase):
287
288     def setUp(self):
289         self.wiki = 'regextest'
290         self.wikiq_out_name = self.wiki + '.tsv'
291         self.infile = "{0}.xml.bz2".format(self.wiki)
292
293         self.input_dir = "dumps"
294         self.input_file = os.path.join(".", self.input_dir,self.infile)
295
296         if not os.path.exists("test_output"):
297             os.mkdir("test_output")
298
299         self.test_output_dir = os.path.join(".", "test_output")
300         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
301         # we have two base calls, one for checking inputs and the other for checking outputs
302         self.base_call = "../wikiq {0}"
303         self.base_call_outs = "../wikiq {0} -o {1}"
304
305         self.baseline_output_dir = "baseline_output"
306
307         # sample inputs for checking that bad inputs get terminated / test_regex_inputs
308         self.bad_inputs_list = [
309             #label is missing            
310             "-RP '\\b\\d+\\b'", 
311             #number of reg and number of labels do not match 
312             "-RP 'NPO V' -RP THE -RPl testlabel",
313             #cp but rp label
314             "-CP '(Tamil|Li)' -RPl testlabel",
315             #regex is missing
316             "-CPl testlabel",
317             "-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'"
318         ]
319
320         # sample inputs for checking the outcomes of good inputs / test_basic_regex
321         self.good_inputs_list = [
322             "-RP '\\b\\d{3}\\b' -RPl threedigits",
323             "-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word",
324             "-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning",
325             "-CP 'WP:EVADE' -CPl wp_evade"         
326         ]
327
328         
329         self.cap_inputs_list = [
330             "-RP 'Li Chevalier' -RPl li_cheval -CP '(?P<letter>\\b[a-zA-Z]{3}\\b)|(?P<number>\\b\\d+\\b)|(?P<cat>\\bcat\\b)' -CPl three",
331             "-CP '(?P<a>\\bTestCaseA\\b)|(?P<b>\\bTestCaseB\\b)|(?P<c>\\bTestCaseC\\b)|(?P<d>\\bTestCaseD\\b)' -CPl testcase -RP '(?P<npov>npov|NPOV)|(?P<neutral>neutral point of view)' -RPl npov"
332         ]
333
334
335
336     def test_regex_inputs(self):
337         for input in self.bad_inputs_list:
338             call = self.base_call.format(self.input_file)
339             call = call + " --stdout " + input
340             print(call)
341             proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
342             stdout,stderr = proc.communicate()
343             #print(proc.returncode)
344             
345             # we want to check that the bad inputs were caught and sys.exit is stopping the code
346             print(stderr.decode("utf-8"))
347             self.assertNotEqual(proc.returncode,0)
348
349     def test_basic_regex(self):
350         for i, input in enumerate(self.good_inputs_list):
351
352             test_filename = "basic_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
353             #print(test_filename)
354             test_file = os.path.join(self.test_output_dir, test_filename)
355             if os.path.exists(test_file):
356                 os.remove(test_file)
357
358             call = self.base_call_outs.format(self.input_file, self.test_output_dir)
359             call = call + " " + input
360             print(call)
361
362             proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
363             proc.wait()
364             copyfile(self.call_output, test_file)
365
366             test = pd.read_table(test_file)
367             
368             baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
369             baseline = pd.read_table(baseline_file)
370             assert_frame_equal(test, baseline)
371             print(i)
372
373
374     def test_capturegroup_regex(self):
375         for i, input in enumerate(self.cap_inputs_list):
376             test_filename = "capturegroup_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
377             print(test_filename)
378             test_file = os.path.join(self.test_output_dir, test_filename)
379             if os.path.exists(test_file):
380                 os.remove(test_file)
381
382             call = self.base_call_outs.format(self.input_file, self.test_output_dir)
383             call = call + " " + input
384             print(call)
385
386             proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
387             proc.wait()
388
389             copyfile(self.call_output, test_file)
390             
391             test = pd.read_table(test_file)
392             
393             baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
394             baseline = pd.read_table(baseline_file)
395             assert_frame_equal(test, baseline)
396
397
398 if __name__ == '__main__':
399     unittest.main()

Community Data Science Collective || Want to submit a patch?