4 from shutil import copyfile
\r
6 from pandas.util.testing import assert_frame_equal
\r
7 from io import StringIO
\r
9 # with / without pwr DONE
\r
10 # with / without url encode DONE
\r
11 # with / without collapse user DONE
\r
12 # with output to sdtout DONE
\r
13 # note that the persistence radius is 7 by default
\r
14 # reading various file formats including
\r
15 # 7z, gz, bz2, xml DONE
\r
16 # wikia and wikipedia data DONE
\r
17 # malformed xmls DONE
\r
19 class Test_Wikipedia(unittest.TestCase):
\r
21 if not os.path.exists("test_output"):
\r
22 os.mkdir("test_output")
\r
24 self.wiki = 'ikwiki-20180301-pages-meta-history'
\r
25 self.wikiq_out_name = self.wiki + ".tsv"
\r
26 self.test_output_dir = os.path.join(".", "test_output")
\r
27 self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
\r
29 self.infile = "{0}.xml.bz2".format(self.wiki)
\r
30 self.base_call = "../wikiq {0} -o {1}"
\r
31 self.input_dir = "dumps"
\r
32 self.input_file = os.path.join(".", self.input_dir,self.infile)
\r
33 self.baseline_output_dir = "baseline_output"
\r
35 def test_WP_url_encode(self):
\r
36 test_filename = "url-encode_" + self.wikiq_out_name
\r
37 test_file = os.path.join(self.test_output_dir, test_filename)
\r
38 if os.path.exists(test_file):
\r
39 os.remove(test_file)
\r
41 call = self.base_call.format(self.input_file, self.test_output_dir)
\r
42 call = call + " --url-encode"
\r
43 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
\r
46 copyfile(self.call_output, test_file)
\r
47 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
\r
49 # as a test let's make sure that we get equal data frames
\r
50 test = pd.read_table(test_file)
\r
51 baseline = pd.read_table(baseline_file)
\r
52 assert_frame_equal(test,baseline)
\r
54 def test_WP_namespaces(self):
\r
55 print(os.path.abspath('.'))
\r
56 test_filename = "namespaces_" + self.wikiq_out_name
\r
57 test_file = os.path.join(self.test_output_dir, test_filename)
\r
58 if os.path.exists(test_file):
\r
59 os.remove(test_file)
\r
61 call = self.base_call.format(self.input_file, self.test_output_dir)
\r
62 call = call + " -n 0 -n 1"
\r
64 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
\r
66 copyfile(self.call_output, test_file)
\r
67 baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
\r
69 # as a test let's make sure that we get equal data frames
\r
70 test = pd.read_table(test_file)
\r
71 num_wrong_ns = sum(~ test.namespace.isin({0,1}))
\r
72 self.assertEqual(num_wrong_ns, 0)
\r
73 baseline = pd.read_table(baseline_file)
\r
74 assert_frame_equal(test,baseline)
\r
76 def test_WP_revert_radius(self):
\r
77 print(os.path.abspath('.'))
\r
78 test_filename = "revert_radius_" + self.wikiq_out_name
\r
79 test_file = os.path.join(self.test_output_dir, test_filename)
\r
80 if os.path.exists(test_file):
\r
81 os.remove(test_file)
\r
83 call = self.base_call.format(self.input_file, self.test_output_dir)
\r
84 call = call + " -n 0 -n 1 -rr 1"
\r
86 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
\r
88 copyfile(self.call_output, test_file)
\r
89 baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
\r
91 # as a test let's make sure that we get equal data frames
\r
92 test = pd.read_table(test_file)
\r
93 num_wrong_ns = sum(~ test.namespace.isin({0,1}))
\r
94 self.assertEqual(num_wrong_ns, 0)
\r
95 baseline = pd.read_table(baseline_file)
\r
96 assert_frame_equal(test,baseline)
\r
100 class Test_Basic(unittest.TestCase):
\r
103 if not os.path.exists("test_output"):
\r
104 os.mkdir("test_output")
\r
106 self.wiki = 'sailormoon'
\r
107 self.wikiq_out_name = self.wiki + ".tsv"
\r
108 self.test_output_dir = os.path.join(".", "test_output")
\r
109 self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
\r
111 self.infile = "{0}.xml.7z".format(self.wiki)
\r
112 self.base_call = "../wikiq {0} -o {1}"
\r
113 self.input_dir = "dumps"
\r
114 self.input_file = os.path.join(".", self.input_dir,self.infile)
\r
115 self.baseline_output_dir = "baseline_output"
\r
117 def test_noargs(self):
\r
119 test_filename = "noargs_" + self.wikiq_out_name
\r
120 test_file = os.path.join(self.test_output_dir, test_filename)
\r
121 if os.path.exists(test_file):
\r
122 os.remove(test_file)
\r
124 call = self.base_call.format(self.input_file, self.test_output_dir)
\r
125 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
\r
128 copyfile(self.call_output, test_file)
\r
130 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
\r
132 test = pd.read_table(test_file)
\r
133 baseline = pd.read_table(baseline_file)
\r
134 assert_frame_equal(test,baseline)
\r
137 def test_collapse_user(self):
\r
138 test_filename = "collapse-user_" + self.wikiq_out_name
\r
139 test_file = os.path.join(self.test_output_dir, test_filename)
\r
140 if os.path.exists(test_file):
\r
141 os.remove(test_file)
\r
143 call = self.base_call.format(self.input_file, self.test_output_dir)
\r
144 call = call + " --collapse-user"
\r
146 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
\r
149 copyfile(self.call_output, test_file)
\r
151 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
\r
152 test = pd.read_table(test_file)
\r
153 baseline = pd.read_table(baseline_file)
\r
154 assert_frame_equal(test,baseline)
\r
156 def test_pwr_segment(self):
\r
157 test_filename = "persistence_segment_" + self.wikiq_out_name
\r
158 test_file = os.path.join(self.test_output_dir, test_filename)
\r
159 if os.path.exists(test_file):
\r
160 os.remove(test_file)
\r
162 call = self.base_call.format(self.input_file, self.test_output_dir)
\r
163 call = call + " --persistence segment"
\r
164 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
\r
168 copyfile(self.call_output, test_file)
\r
170 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
\r
172 test = pd.read_table(test_file)
\r
173 baseline = pd.read_table(baseline_file)
\r
174 assert_frame_equal(test,baseline)
\r
176 def test_pwr_legacy(self):
\r
177 test_filename = "persistence_legacy_" + self.wikiq_out_name
\r
178 test_file = os.path.join(self.test_output_dir, test_filename)
\r
179 if os.path.exists(test_file):
\r
180 os.remove(test_file)
\r
182 call = self.base_call.format(self.input_file, self.test_output_dir)
\r
183 call = call + " --persistence legacy"
\r
184 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
\r
188 copyfile(self.call_output, test_file)
\r
190 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
\r
192 test = pd.read_table(test_file)
\r
193 baseline = pd.read_table(baseline_file)
\r
194 assert_frame_equal(test,baseline)
\r
196 def test_pwr(self):
\r
197 test_filename = "persistence_" + self.wikiq_out_name
\r
198 test_file = os.path.join(self.test_output_dir, test_filename)
\r
199 if os.path.exists(test_file):
\r
200 os.remove(test_file)
\r
202 call = self.base_call.format(self.input_file, self.test_output_dir)
\r
203 call = call + " --persistence"
\r
204 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
\r
208 copyfile(self.call_output, test_file)
\r
210 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
\r
212 test = pd.read_table(test_file)
\r
213 baseline = pd.read_table(baseline_file)
\r
214 assert_frame_equal(test,baseline)
\r
217 def test_url_encode(self):
\r
218 test_filename = "url-encode_" + self.wikiq_out_name
\r
220 test_file = os.path.join(self.test_output_dir, test_filename)
\r
221 if os.path.exists(test_file):
\r
222 os.remove(test_file)
\r
224 call = self.base_call.format(self.input_file, self.test_output_dir)
\r
225 call = call + " --url-encode"
\r
226 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
\r
229 copyfile(self.call_output, test_file)
\r
230 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
\r
231 test = pd.read_table(test_file)
\r
232 baseline = pd.read_table(baseline_file)
\r
233 assert_frame_equal(test,baseline)
\r
236 class Test_Malformed(unittest.TestCase):
\r
238 if not os.path.exists("test_output"):
\r
239 os.mkdir("test_output")
\r
241 self.wiki = 'twinpeaks'
\r
242 self.wikiq_out_name = self.wiki + ".tsv"
\r
243 self.test_output_dir = os.path.join(".", "test_output")
\r
244 self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
\r
246 self.infile = "{0}.xml.7z".format(self.wiki)
\r
247 self.base_call = "../wikiq {0} -o {1}"
\r
248 self.input_dir = "dumps"
\r
249 self.input_file = os.path.join(".", self.input_dir,self.infile)
\r
252 def test_malformed_noargs(self):
\r
254 call = self.base_call.format(self.input_file, self.test_output_dir)
\r
255 proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
\r
257 outs, errs = proc.communicate()
\r
258 errlines = str(errs).split("\\n")
\r
259 self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
\r
261 class Test_Stdout(unittest.TestCase):
\r
264 self.wiki = 'sailormoon'
\r
265 self.wikiq_out_name = self.wiki + ".tsv"
\r
267 self.infile = "{0}.xml.7z".format(self.wiki)
\r
268 self.base_call = "../wikiq {0} --stdout"
\r
269 self.input_dir = "dumps"
\r
270 self.input_file = os.path.join(".", self.input_dir,self.infile)
\r
271 self.baseline_output_dir = "baseline_output"
\r
273 def test_noargs(self):
\r
275 call = self.base_call.format(self.input_file)
\r
276 proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
\r
277 outs = proc.stdout.decode("utf8")
\r
279 test_file = "noargs_" + self.wikiq_out_name
\r
280 baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
\r
281 print(baseline_file)
\r
282 test = pd.read_table(StringIO(outs))
\r
283 baseline = pd.read_table(baseline_file)
\r
284 assert_frame_equal(test,baseline)
\r
286 class Test_Regex(unittest.TestCase):
\r
289 self.wiki = 'emptytext'
\r
290 self.wikiq_out_name = self.wiki + '.tsv'
\r
291 self.infile = "{0}.xml.bz2".format(self.wiki)
\r
293 self.input_dir = "dumps"
\r
294 self.input_file = os.path.join(".", self.input_dir,self.infile)
\r
296 if not os.path.exists("test_output"):
\r
297 os.mkdir("test_output")
\r
299 self.test_output_dir = os.path.join(".", "test_output")
\r
300 self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
\r
301 # we have two base calls, one for checking inputs and the other for checking outputs
\r
302 self.base_call = "../wikiq {0}"
\r
303 self.base_call_outs = "../wikiq {0} -o {1}"
\r
305 self.baseline_output_dir = "baseline_output"
\r
307 # sample inputs for checking that bad inputs get terminated / test_regex_inputs
\r
308 self.bad_inputs_list = [
\r
310 "-RP '\\b\\d+\\b'",
\r
311 #number of reg and number of labels do not match
\r
312 "-RP 'NPO V' -RP THE -RPl testlabel",
\r
314 "-CP '(Tamil|Li)' -RPl testlabel",
\r
317 "-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'"
\r
320 # sample inputs for checking the outcomes of good inputs / test_basic_regex
\r
321 self.good_inputs_list = [
\r
322 "-RP '\\b\\d{3}\\b' -RPl threedigits",
\r
323 "-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word",
\r
324 "-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning",
\r
325 "-CP 'WP:EVADE' -CPl wp_evade"
\r
329 self.cap_inputs_list = [
\r
330 "-RP 'Li Chevalier' -RPl li_cheval -CP '(?P<letter>\\b[a-zA-Z]{3}\\b)|(?P<number>\\b\\d+\\b)|(?P<cat>\\bcat\\b)' -CPl three",
\r
331 "-CP '(?P<a>\\bTestCaseA\\b)|(?P<b>\\bTestCaseB\\b)|(?P<c>\\bTestCaseC\\b)|(?P<d>\\bTestCaseD\\b)' -CPl testcase -RP '(?P<npov>npov|NPOV)|(?P<neutral>neutral point of view)' -RPl npov"
\r
336 def test_regex_inputs(self):
\r
337 for input in self.bad_inputs_list:
\r
338 call = self.base_call.format(self.input_file)
\r
339 call = call + " --stdout " + input
\r
341 proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
\r
342 stdout,stderr = proc.communicate()
\r
343 #print(proc.returncode)
\r
345 # we want to check that the bad inputs were caught and sys.exit is stopping the code
\r
346 #print(stderr.decode("utf-8"))
\r
347 self.assertNotEqual(proc.returncode,0)
\r
349 def test_basic_regex(self):
\r
350 for i, input in enumerate(self.good_inputs_list):
\r
352 test_filename = "basic_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
\r
353 #print(test_filename)
\r
354 test_file = os.path.join(self.test_output_dir, test_filename)
\r
355 if os.path.exists(test_file):
\r
356 os.remove(test_file)
\r
358 call = self.base_call_outs.format(self.input_file, self.test_output_dir)
\r
359 call = call + " " + input
\r
362 proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
\r
364 copyfile(self.call_output, test_file)
\r
366 test = pd.read_table(test_file)
\r
368 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
\r
369 baseline = pd.read_table(baseline_file)
\r
370 #assert_frame_equal(test, baseline)
\r
374 def test_capturegroup_regex(self):
\r
375 for i, input in enumerate(self.cap_inputs_list):
\r
376 test_filename = "capturegroup_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
\r
377 #print(test_filename)
\r
378 test_file = os.path.join(self.test_output_dir, test_filename)
\r
379 if os.path.exists(test_file):
\r
380 os.remove(test_file)
\r
382 call = self.base_call_outs.format(self.input_file, self.test_output_dir)
\r
383 call = call + " " + input
\r
386 proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
\r
389 copyfile(self.call_output, test_file)
\r
391 test = pd.read_table(test_file)
\r
393 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
\r
394 baseline = pd.read_table(baseline_file)
\r
395 #assert_frame_equal(test, baseline)
\r
398 if __name__ == '__main__':
\r