4 from shutil import copyfile
6 from pandas.testing import assert_frame_equal
7 from io import StringIO
9 # with / without pwr DONE
10 # with / without url encode DONE
11 # with / without collapse user DONE
12 # with output to sdtout DONE
13 # note that the persistence radius is 7 by default
14 # reading various file formats including
15 # 7z, gz, bz2, xml DONE
16 # wikia and wikipedia data DONE
19 class Test_Wikipedia(unittest.TestCase):
21 if not os.path.exists("test_output"):
22 os.mkdir("test_output")
24 self.wiki = 'ikwiki-20180301-pages-meta-history'
25 self.wikiq_out_name = self.wiki + ".tsv"
26 self.test_output_dir = os.path.join(".", "test_output")
27 self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
29 self.infile = "{0}.xml.bz2".format(self.wiki)
30 self.base_call = "../wikiq {0} -o {1}"
31 self.input_dir = "dumps"
32 self.input_file = os.path.join(".", self.input_dir,self.infile)
33 self.baseline_output_dir = "baseline_output"
35 def test_WP_url_encode(self):
36 test_filename = "url-encode_" + self.wikiq_out_name
37 test_file = os.path.join(self.test_output_dir, test_filename)
38 if os.path.exists(test_file):
41 call = self.base_call.format(self.input_file, self.test_output_dir)
42 call = call + " --url-encode"
43 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
46 copyfile(self.call_output, test_file)
47 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
49 # as a test let's make sure that we get equal data frames
50 test = pd.read_table(test_file)
51 baseline = pd.read_table(baseline_file)
52 assert_frame_equal(test,baseline, check_like=True)
54 def test_WP_namespaces(self):
55 print(os.path.abspath('.'))
56 test_filename = "namespaces_" + self.wikiq_out_name
57 test_file = os.path.join(self.test_output_dir, test_filename)
58 if os.path.exists(test_file):
61 call = self.base_call.format(self.input_file, self.test_output_dir)
62 call = call + " -n 0 -n 1"
64 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
66 copyfile(self.call_output, test_file)
67 baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
69 # as a test let's make sure that we get equal data frames
70 test = pd.read_table(test_file)
71 num_wrong_ns = sum(~ test.namespace.isin({0,1}))
72 self.assertEqual(num_wrong_ns, 0)
73 baseline = pd.read_table(baseline_file)
74 assert_frame_equal(test,baseline, check_like=True)
76 def test_WP_revert_radius(self):
77 print(os.path.abspath('.'))
78 test_filename = "revert_radius_" + self.wikiq_out_name
79 test_file = os.path.join(self.test_output_dir, test_filename)
80 if os.path.exists(test_file):
83 call = self.base_call.format(self.input_file, self.test_output_dir)
84 call = call + " -n 0 -n 1 -rr 1"
86 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
88 copyfile(self.call_output, test_file)
89 baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
91 # as a test let's make sure that we get equal data frames
92 test = pd.read_table(test_file)
93 num_wrong_ns = sum(~ test.namespace.isin({0,1}))
94 self.assertEqual(num_wrong_ns, 0)
95 baseline = pd.read_table(baseline_file)
96 assert_frame_equal(test,baseline, check_like=True)
100 class Test_Basic(unittest.TestCase):
103 if not os.path.exists("test_output"):
104 os.mkdir("test_output")
106 self.wiki = 'sailormoon'
107 self.wikiq_out_name = self.wiki + ".tsv"
108 self.test_output_dir = os.path.join(".", "test_output")
109 self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
111 self.infile = "{0}.xml.7z".format(self.wiki)
112 self.base_call = "../wikiq {0} -o {1}"
113 self.input_dir = "dumps"
114 self.input_file = os.path.join(".", self.input_dir,self.infile)
115 self.baseline_output_dir = "baseline_output"
117 def test_noargs(self):
119 test_filename = "noargs_" + self.wikiq_out_name
120 test_file = os.path.join(self.test_output_dir, test_filename)
121 if os.path.exists(test_file):
124 call = self.base_call.format(self.input_file, self.test_output_dir)
125 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
128 copyfile(self.call_output, test_file)
130 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
132 test = pd.read_table(test_file)
133 baseline = pd.read_table(baseline_file)
134 assert_frame_equal(test,baseline, check_like=True)
137 def test_collapse_user(self):
138 test_filename = "collapse-user_" + self.wikiq_out_name
139 test_file = os.path.join(self.test_output_dir, test_filename)
140 if os.path.exists(test_file):
143 call = self.base_call.format(self.input_file, self.test_output_dir)
144 call = call + " --collapse-user"
146 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
149 copyfile(self.call_output, test_file)
151 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
152 test = pd.read_table(test_file)
153 baseline = pd.read_table(baseline_file)
154 assert_frame_equal(test,baseline, check_like=True)
156 def test_pwr_segment(self):
157 test_filename = "persistence_segment_" + self.wikiq_out_name
158 test_file = os.path.join(self.test_output_dir, test_filename)
159 if os.path.exists(test_file):
162 call = self.base_call.format(self.input_file, self.test_output_dir)
163 call = call + " --persistence segment"
164 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
168 copyfile(self.call_output, test_file)
170 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
172 test = pd.read_table(test_file)
173 baseline = pd.read_table(baseline_file)
174 assert_frame_equal(test,baseline, check_like=True)
176 def test_pwr_legacy(self):
177 test_filename = "persistence_legacy_" + self.wikiq_out_name
178 test_file = os.path.join(self.test_output_dir, test_filename)
179 if os.path.exists(test_file):
182 call = self.base_call.format(self.input_file, self.test_output_dir)
183 call = call + " --persistence legacy"
184 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
188 copyfile(self.call_output, test_file)
190 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
192 test = pd.read_table(test_file)
193 baseline = pd.read_table(baseline_file)
194 assert_frame_equal(test,baseline, check_like=True)
197 test_filename = "persistence_" + self.wikiq_out_name
198 test_file = os.path.join(self.test_output_dir, test_filename)
199 if os.path.exists(test_file):
202 call = self.base_call.format(self.input_file, self.test_output_dir)
203 call = call + " --persistence"
204 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
208 copyfile(self.call_output, test_file)
210 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
212 test = pd.read_table(test_file)
213 baseline = pd.read_table(baseline_file)
215 test = test.reindex(columns=sorted(test.columns))
216 assert_frame_equal(test,baseline, check_like=True)
219 def test_url_encode(self):
220 test_filename = "url-encode_" + self.wikiq_out_name
222 test_file = os.path.join(self.test_output_dir, test_filename)
223 if os.path.exists(test_file):
226 call = self.base_call.format(self.input_file, self.test_output_dir)
227 call = call + " --url-encode"
228 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
231 copyfile(self.call_output, test_file)
232 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
233 test = pd.read_table(test_file)
234 baseline = pd.read_table(baseline_file)
236 test = test.reindex(columns=sorted(test.columns))
237 assert_frame_equal(test,baseline, check_like=True)
240 class Test_Malformed(unittest.TestCase):
242 if not os.path.exists("test_output"):
243 os.mkdir("test_output")
245 self.wiki = 'twinpeaks'
246 self.wikiq_out_name = self.wiki + ".tsv"
247 self.test_output_dir = os.path.join(".", "test_output")
248 self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
250 self.infile = "{0}.xml.7z".format(self.wiki)
251 self.base_call = "../wikiq {0} -o {1}"
252 self.input_dir = "dumps"
253 self.input_file = os.path.join(".", self.input_dir,self.infile)
256 def test_malformed_noargs(self):
258 call = self.base_call.format(self.input_file, self.test_output_dir)
259 proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
261 outs, errs = proc.communicate()
262 errlines = str(errs).split("\\n")
263 self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
265 class Test_Stdout(unittest.TestCase):
268 self.wiki = 'sailormoon'
269 self.wikiq_out_name = self.wiki + ".tsv"
271 self.infile = "{0}.xml.7z".format(self.wiki)
272 self.base_call = "../wikiq {0} --stdout"
273 self.input_dir = "dumps"
274 self.input_file = os.path.join(".", self.input_dir,self.infile)
275 self.baseline_output_dir = "baseline_output"
277 def test_noargs(self):
279 call = self.base_call.format(self.input_file)
281 proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
282 outs = proc.stdout.decode("utf8")
284 test_file = "noargs_" + self.wikiq_out_name
285 baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
287 test = pd.read_table(StringIO(outs))
288 baseline = pd.read_table(baseline_file)
289 assert_frame_equal(test,baseline, check_like=True)
291 class Test_Regex(unittest.TestCase):
294 self.wiki = 'regextest'
295 self.wikiq_out_name = self.wiki + '.tsv'
296 self.infile = "{0}.xml.bz2".format(self.wiki)
298 self.input_dir = "dumps"
299 self.input_file = os.path.join(".", self.input_dir,self.infile)
301 if not os.path.exists("test_output"):
302 os.mkdir("test_output")
304 self.test_output_dir = os.path.join(".", "test_output")
305 self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
306 # we have two base calls, one for checking inputs and the other for checking outputs
307 self.base_call = "../wikiq {0}"
308 self.base_call_outs = "../wikiq {0} -o {1}"
310 self.baseline_output_dir = "baseline_output"
312 # sample inputs for checking that bad inputs get terminated / test_regex_inputs
313 self.bad_inputs_list = [
316 #number of reg and number of labels do not match
317 "-RP 'NPO V' -RP THE -RPl testlabel",
319 "-CP '(Tamil|Li)' -RPl testlabel",
322 "-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'"
325 # sample inputs for checking the outcomes of good inputs / test_basic_regex
326 self.good_inputs_list = [
327 "-RP '\\b\\d{3}\\b' -RPl threedigits",
328 "-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word",
329 "-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning",
330 "-CP 'WP:EVADE' -CPl wp_evade"
334 self.cap_inputs_list = [
335 "-RP 'Li Chevalier' -RPl li_cheval -CP '(?P<letter>\\b[a-zA-Z]{3}\\b)|(?P<number>\\b\\d+\\b)|(?P<cat>\\bcat\\b)' -CPl three",
336 "-CP '(?P<a>\\bTestCaseA\\b)|(?P<b>\\bTestCaseB\\b)|(?P<c>\\bTestCaseC\\b)|(?P<d>\\bTestCaseD\\b)' -CPl testcase -RP '(?P<npov>npov|NPOV)|(?P<neutral>neutral point of view)' -RPl npov"
341 def test_regex_inputs(self):
342 for input in self.bad_inputs_list:
343 call = self.base_call.format(self.input_file)
344 call = call + " --stdout " + input
346 proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
347 stdout,stderr = proc.communicate()
348 #print(proc.returncode)
350 # we want to check that the bad inputs were caught and sys.exit is stopping the code
351 print(stderr.decode("utf-8"))
352 self.assertNotEqual(proc.returncode,0)
354 def test_basic_regex(self):
355 for i, input in enumerate(self.good_inputs_list):
357 test_filename = "basic_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
358 #print(test_filename)
359 test_file = os.path.join(self.test_output_dir, test_filename)
360 if os.path.exists(test_file):
363 call = self.base_call_outs.format(self.input_file, self.test_output_dir)
364 call = call + " " + input
367 proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
369 copyfile(self.call_output, test_file)
371 test = pd.read_table(test_file)
373 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
374 baseline = pd.read_table(baseline_file)
375 assert_frame_equal(test, baseline, check_like=True)
379 def test_capturegroup_regex(self):
380 for i, input in enumerate(self.cap_inputs_list):
381 test_filename = "capturegroup_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
383 test_file = os.path.join(self.test_output_dir, test_filename)
384 if os.path.exists(test_file):
387 call = self.base_call_outs.format(self.input_file, self.test_output_dir)
388 call = call + " " + input
391 proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
394 copyfile(self.call_output, test_file)
396 test = pd.read_table(test_file)
398 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
399 baseline = pd.read_table(baseline_file)
400 assert_frame_equal(test, baseline, check_like=True)
403 if __name__ == '__main__':