159fd1088af1f3fbfeb9d10fb0810a099dd6ccc5
[mediawiki_dump_tools.git] / test / Wikiq_Unit_Test.py
1 import unittest
2 import os
3 import subprocess
4 from shutil import copyfile
5 import pandas as pd
6 from pandas.util.testing import assert_frame_equal
7 from io import StringIO
8
9 # with / without pwr DONE
10 # with / without url encode DONE
11 # with / without collapse user DONE
12 # with output to sdtout DONE
13 # note that the persistence radius is 7 by default
14 # reading various file formats including
15 #        7z, gz, bz2, xml  DONE
16 # wikia and wikipedia data DONE
17 # malformed xmls DONE
18
19 class Test_Wikipedia(unittest.TestCase):
20     def setUp(self):
21         if not os.path.exists("test_output"):
22             os.mkdir("test_output")
23
24         self.wiki = 'ikwiki-20180301-pages-meta-history'
25         self.wikiq_out_name =  self.wiki + ".tsv"
26         self.test_output_dir = os.path.join(".", "test_output")
27         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
28
29         self.infile = "{0}.xml.bz2".format(self.wiki)    
30         self.base_call = "../wikiq {0} -o {1}"
31         self.input_dir = "dumps"
32         self.input_file = os.path.join(".", self.input_dir,self.infile)
33         self.baseline_output_dir = "baseline_output"
34
35     def test_WP_url_encode(self):
36         test_filename =  "url-encode_" + self.wikiq_out_name
37         test_file = os.path.join(self.test_output_dir, test_filename)
38         if os.path.exists(test_file):
39             os.remove(test_file)
40         
41         call = self.base_call.format(self.input_file, self.test_output_dir)
42         call = call + " --url-encode"
43         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
44         proc.wait()
45
46         copyfile(self.call_output, test_file)
47         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
48
49         # as a test let's make sure that we get equal data frames
50         test = pd.read_table(test_file)
51         baseline = pd.read_table(baseline_file)
52         assert_frame_equal(test,baseline)
53
54     def test_WP_namespaces(self):
55         print(os.path.abspath('.'))
56         test_filename =  "namespaces_" + self.wikiq_out_name
57         test_file = os.path.join(self.test_output_dir, test_filename)
58         if os.path.exists(test_file):
59             os.remove(test_file)
60         
61         call = self.base_call.format(self.input_file, self.test_output_dir)
62         call = call + " -n 0 -n 1"
63         print(call)
64         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
65         proc.wait()
66         copyfile(self.call_output, test_file)
67         baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
68
69         # as a test let's make sure that we get equal data frames
70         test = pd.read_table(test_file)
71         num_wrong_ns = sum(~ test.namespace.isin({0,1}))
72         self.assertEqual(num_wrong_ns, 0)
73         baseline = pd.read_table(baseline_file)
74         assert_frame_equal(test,baseline)
75
76     def test_WP_revert_radius(self):
77         print(os.path.abspath('.'))
78         test_filename =  "revert_radius_" + self.wikiq_out_name
79         test_file = os.path.join(self.test_output_dir, test_filename)
80         if os.path.exists(test_file):
81             os.remove(test_file)
82         
83         call = self.base_call.format(self.input_file, self.test_output_dir)
84         call = call + " -n 0 -n 1 -rr 1"
85         print(call)
86         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
87         proc.wait()
88         copyfile(self.call_output, test_file)
89         baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
90
91         # as a test let's make sure that we get equal data frames
92         test = pd.read_table(test_file)
93         num_wrong_ns = sum(~ test.namespace.isin({0,1}))
94         self.assertEqual(num_wrong_ns, 0)
95         baseline = pd.read_table(baseline_file)
96         assert_frame_equal(test,baseline)
97
98
99
100 class Test_Basic(unittest.TestCase):
101
102     def setUp(self):
103         if not os.path.exists("test_output"):
104             os.mkdir("test_output")
105
106         self.wiki = 'sailormoon'
107         self.wikiq_out_name =  self.wiki + ".tsv"
108         self.test_output_dir = os.path.join(".", "test_output")
109         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
110
111         self.infile = "{0}.xml.7z".format(self.wiki)
112         self.base_call = "../wikiq {0} -o {1}"
113         self.input_dir = "dumps"
114         self.input_file = os.path.join(".", self.input_dir,self.infile)
115         self.baseline_output_dir = "baseline_output"
116
117     def test_noargs(self):
118
119         test_filename =  "noargs_" + self.wikiq_out_name
120         test_file = os.path.join(self.test_output_dir, test_filename)
121         if os.path.exists(test_file):
122             os.remove(test_file)
123         
124         call = self.base_call.format(self.input_file, self.test_output_dir)
125         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
126         proc.wait()
127
128         copyfile(self.call_output, test_file)
129
130         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
131
132         test = pd.read_table(test_file)
133         baseline = pd.read_table(baseline_file)
134         assert_frame_equal(test,baseline)
135
136
137     def test_collapse_user(self):
138         test_filename =  "collapse-user_" + self.wikiq_out_name
139         test_file = os.path.join(self.test_output_dir, test_filename)
140         if os.path.exists(test_file):
141             os.remove(test_file)
142         
143         call = self.base_call.format(self.input_file, self.test_output_dir)
144         call = call + " --collapse-user"
145
146         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
147         proc.wait()
148
149         copyfile(self.call_output, test_file)
150
151         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
152         test = pd.read_table(test_file)
153         baseline = pd.read_table(baseline_file)
154         assert_frame_equal(test,baseline)
155
156     def test_pwr_segment(self):
157         test_filename =  "persistence_segment_" + self.wikiq_out_name
158         test_file = os.path.join(self.test_output_dir, test_filename)
159         if os.path.exists(test_file):
160             os.remove(test_file)
161         
162         call = self.base_call.format(self.input_file, self.test_output_dir)
163         call = call + " --persistence segment"
164         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
165         proc.wait()
166
167
168         copyfile(self.call_output, test_file)
169
170         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
171
172         test = pd.read_table(test_file)
173         baseline = pd.read_table(baseline_file)
174         assert_frame_equal(test,baseline)
175
176     def test_pwr_legacy(self):
177         test_filename =  "persistence_legacy_" + self.wikiq_out_name
178         test_file = os.path.join(self.test_output_dir, test_filename)
179         if os.path.exists(test_file):
180             os.remove(test_file)
181         
182         call = self.base_call.format(self.input_file, self.test_output_dir)
183         call = call + " --persistence legacy"
184         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
185         proc.wait()
186
187
188         copyfile(self.call_output, test_file)
189
190         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
191
192         test = pd.read_table(test_file)
193         baseline = pd.read_table(baseline_file)
194         assert_frame_equal(test,baseline)
195
196     def test_pwr(self):
197         test_filename =  "persistence_" + self.wikiq_out_name
198         test_file = os.path.join(self.test_output_dir, test_filename)
199         if os.path.exists(test_file): 
200            os.remove(test_file)
201         
202         call = self.base_call.format(self.input_file, self.test_output_dir)
203         call = call + " --persistence"
204         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
205         proc.wait()
206
207
208         copyfile(self.call_output, test_file)
209
210         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
211
212         test = pd.read_table(test_file)
213         baseline = pd.read_table(baseline_file)
214         assert_frame_equal(test,baseline)
215
216
217     def test_url_encode(self):
218         test_filename =  "url-encode_" + self.wikiq_out_name
219
220         test_file = os.path.join(self.test_output_dir, test_filename)
221         if os.path.exists(test_file):
222             os.remove(test_file)
223         
224         call = self.base_call.format(self.input_file, self.test_output_dir)
225         call = call + " --url-encode"
226         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
227         proc.wait()
228
229         copyfile(self.call_output, test_file)
230         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
231         test = pd.read_table(test_file)
232         baseline = pd.read_table(baseline_file)
233         assert_frame_equal(test,baseline)
234
235
236 class Test_Malformed(unittest.TestCase):
237     def setUp(self):
238         if not os.path.exists("test_output"):
239             os.mkdir("test_output")
240
241         self.wiki = 'twinpeaks'
242         self.wikiq_out_name =  self.wiki + ".tsv"
243         self.test_output_dir = os.path.join(".", "test_output")
244         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
245
246         self.infile = "{0}.xml.7z".format(self.wiki)
247         self.base_call = "../wikiq {0} -o {1}"
248         self.input_dir = "dumps"
249         self.input_file = os.path.join(".", self.input_dir,self.infile)
250
251
252     def test_malformed_noargs(self):
253
254         call = self.base_call.format(self.input_file, self.test_output_dir)
255         proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
256         proc.wait()
257         outs, errs = proc.communicate()
258         errlines = str(errs).split("\\n")
259         self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
260
261 class Test_Stdout(unittest.TestCase):
262
263     def setUp(self):
264         self.wiki = 'sailormoon'
265         self.wikiq_out_name =  self.wiki + ".tsv"
266
267         self.infile = "{0}.xml.7z".format(self.wiki)
268         self.base_call = "../wikiq {0} --stdout"
269         self.input_dir = "dumps"
270         self.input_file = os.path.join(".", self.input_dir,self.infile)
271         self.baseline_output_dir = "baseline_output"
272
273     def test_noargs(self):
274
275         call = self.base_call.format(self.input_file)
276         proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
277         outs = proc.stdout.decode("utf8")
278
279         test_file = "noargs_" + self.wikiq_out_name
280         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
281         print(baseline_file)
282         test = pd.read_table(StringIO(outs))
283         baseline = pd.read_table(baseline_file)
284         assert_frame_equal(test,baseline)
285
286 class Test_Regex(unittest.TestCase):
287
288     def setUp(self):
289         self.wiki = 'regextest'
290         self.wikiq_out_name = self.wiki + '.tsv'
291         self.infile = "{0}.xml.bz2".format(self.wiki)
292
293         self.input_dir = "dumps"
294         self.input_file = os.path.join(".", self.input_dir,self.infile)
295
296         if not os.path.exists("test_output"):
297             os.mkdir("test_output")
298
299         self.test_output_dir = os.path.join(".", "test_output")
300         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
301         # we have two base calls, one for checking inputs and the other for checking outputs
302         self.base_call = "../wikiq {0}"
303         self.base_call_outs = "../wikiq {0} -o {1}"
304
305         self.baseline_output_dir = "baseline_output"
306
307         # sample inputs for checking that bad inputs get terminated / test_regex_inputs
308         self.bad_input1 = "-RP '\\b\\d+\\b'" #label is missing
309         self.bad_input2 = "-RP 'NPO V' -RP THE -RPl testlabel" #number of reg and number of labels do not match
310         self.bad_input3 = "-CP '(Tamil|Li)' -RPl testlabel" #cp but rp label
311         self.bad_input4 = "-CPl testlabel" #regex is missing
312         self.bad_input5 = "-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'"
313
314         self.bad_inputs_list = [self.bad_input1,self.bad_input2,self.bad_input3,self.bad_input4,self.bad_input5]
315
316         # sample inputs for checking the outcomes of good inputs / test_basic_regex
317         self.good_input1 = "-RP '\\b\\d{3}\\b' -RPl threedigits"
318         self.good_input2 = "-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word"
319         self.good_input3 = "-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning"
320         self.good_input4 = "-CP 'WP:EVADE' -CPl wp_evade"
321
322         self.good_inputs_list = [self.good_input1,self.good_input2,self.good_input3, self.good_input4]
323
324         # and with capture group(s) / test_capturegroup_regex
325         self.cap_input1 = "-RP 'Li Chevalier' -RPl li_cheval -CP '(?P<letter>\\b[a-zA-Z]{3}\\b)|(?P<number>\\b\\d+\\b)|(?P<cat>\\bcat\\b)' -CPl three"
326         self.cap_input2 = "-CP '(?P<a>\\bTestCaseA\\b)|(?P<b>\\bTestCaseB\\b)|(?P<c>\\bTestCaseC\\b)|(?P<d>\\bTestCaseD\\b)' -CPl testcase -RP '(?P<npov>npov|NPOV)|(?P<neutral>neutral point of view)' -RPl npov"
327
328         self.cap_inputs_list = [self.cap_input1,self.cap_input2]
329
330     def test_regex_inputs(self):
331         for input in self.bad_inputs_list:
332             call = self.base_call.format(self.input_file)
333             call = call + " --stdout " + input
334             print(call)
335             proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
336             stdout,stderr = proc.communicate()
337             #print(proc.returncode)
338             
339             # we want to check that the bad inputs were caught and sys.exit is stopping the code
340             print(stderr.decode("utf-8"))
341             self.assertNotEqual(proc.returncode,0)
342
343     def test_basic_regex(self):
344         i = 1
345         for input in self.good_inputs_list:
346
347             test_filename = "basic_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
348             #print(test_filename)
349             test_file = os.path.join(self.test_output_dir, test_filename)
350             if os.path.exists(test_file):
351                 os.remove(test_file)
352
353             call = self.base_call_outs.format(self.input_file, self.test_output_dir)
354             call = call + " " + input
355             print(call)
356
357             proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
358             proc.wait()
359
360             copyfile(self.call_output, test_file)
361             f = open(self.call_output, 'w')
362             f.close()
363             
364             # don't have a baseline file to compare a test to??
365             # baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
366             i += 1
367
368             #TODO
369             #a proper assert statement still needs to go here, but I checked out the generated files for now and it functions
370
371
372     def test_capturegroup_regex(self):
373         i = 1
374         for input in self.cap_inputs_list:
375             test_filename = "capturegroup_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
376             print(test_filename)
377             test_file = os.path.join(self.test_output_dir, test_filename)
378             if os.path.exists(test_file):
379                 os.remove(test_file)
380
381             call = self.base_call_outs.format(self.input_file, self.test_output_dir)
382             call = call + " " + input
383             print(call)
384
385             proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
386             proc.wait()
387
388             copyfile(self.call_output, test_file)
389             f = open(self.call_output, 'w')
390             f.close()
391             
392             # don't have a baseline file to compare a test to??
393             # baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
394             i += 1
395
396             #TODO
397             #a proper assert statement still needs to go here, but I checked out the generated files for now and it functions
398
399 if __name__ == '__main__':
400     unittest.main()

Community Data Science Collective || Want to submit a patch?