4 from shutil import copyfile
6 from pandas.util.testing import assert_frame_equal
7 from io import StringIO
9 # with / without pwr DONE
10 # with / without url encode DONE
11 # with / without collapse user DONE
12 # with output to sdtout DONE
13 # note that the persistence radius is 7 by default
14 # reading various file formats including
15 # 7z, gz, bz2, xml DONE
16 # wikia and wikipedia data DONE
19 class Test_Wikipedia(unittest.TestCase):
21 if not os.path.exists("test_output"):
22 os.mkdir("test_output")
24 self.wiki = 'ikwiki-20180301-pages-meta-history'
25 self.wikiq_out_name = self.wiki + ".tsv"
26 self.test_output_dir = os.path.join(".", "test_output")
27 self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
29 self.infile = "{0}.xml.bz2".format(self.wiki)
30 self.base_call = "../wikiq {0} -o {1}"
31 self.input_dir = "dumps"
32 self.input_file = os.path.join(".", self.input_dir,self.infile)
33 self.baseline_output_dir = "baseline_output"
35 def test_WP_url_encode(self):
36 test_filename = "url-encode_" + self.wikiq_out_name
37 test_file = os.path.join(self.test_output_dir, test_filename)
38 if os.path.exists(test_file):
41 call = self.base_call.format(self.input_file, self.test_output_dir)
42 call = call + " --url-encode"
43 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
46 copyfile(self.call_output, test_file)
47 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
49 # as a test let's make sure that we get equal data frames
50 test = pd.read_table(test_file)
51 baseline = pd.read_table(baseline_file)
52 assert_frame_equal(test,baseline)
54 def test_WP_namespaces(self):
55 print(os.path.abspath('.'))
56 test_filename = "namespaces_" + self.wikiq_out_name
57 test_file = os.path.join(self.test_output_dir, test_filename)
58 if os.path.exists(test_file):
61 call = self.base_call.format(self.input_file, self.test_output_dir)
62 call = call + " -n 0 -n 1"
64 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
66 copyfile(self.call_output, test_file)
67 baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
69 # as a test let's make sure that we get equal data frames
70 test = pd.read_table(test_file)
71 num_wrong_ns = sum(~ test.namespace.isin({0,1}))
72 self.assertEqual(num_wrong_ns, 0)
73 baseline = pd.read_table(baseline_file)
74 assert_frame_equal(test,baseline)
76 def test_WP_revert_radius(self):
77 print(os.path.abspath('.'))
78 test_filename = "revert_radius_" + self.wikiq_out_name
79 test_file = os.path.join(self.test_output_dir, test_filename)
80 if os.path.exists(test_file):
83 call = self.base_call.format(self.input_file, self.test_output_dir)
84 call = call + " -n 0 -n 1 -rr 1"
86 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
88 copyfile(self.call_output, test_file)
89 baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
91 # as a test let's make sure that we get equal data frames
92 test = pd.read_table(test_file)
93 num_wrong_ns = sum(~ test.namespace.isin({0,1}))
94 self.assertEqual(num_wrong_ns, 0)
95 baseline = pd.read_table(baseline_file)
96 assert_frame_equal(test,baseline)
100 class Test_Basic(unittest.TestCase):
103 if not os.path.exists("test_output"):
104 os.mkdir("test_output")
106 self.wiki = 'sailormoon'
107 self.wikiq_out_name = self.wiki + ".tsv"
108 self.test_output_dir = os.path.join(".", "test_output")
109 self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
111 self.infile = "{0}.xml.7z".format(self.wiki)
112 self.base_call = "../wikiq {0} -o {1}"
113 self.input_dir = "dumps"
114 self.input_file = os.path.join(".", self.input_dir,self.infile)
115 self.baseline_output_dir = "baseline_output"
117 def test_noargs(self):
119 test_filename = "noargs_" + self.wikiq_out_name
120 test_file = os.path.join(self.test_output_dir, test_filename)
121 if os.path.exists(test_file):
124 call = self.base_call.format(self.input_file, self.test_output_dir)
125 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
128 copyfile(self.call_output, test_file)
130 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
132 test = pd.read_table(test_file)
133 baseline = pd.read_table(baseline_file)
134 assert_frame_equal(test,baseline)
137 def test_collapse_user(self):
138 test_filename = "collapse-user_" + self.wikiq_out_name
139 test_file = os.path.join(self.test_output_dir, test_filename)
140 if os.path.exists(test_file):
143 call = self.base_call.format(self.input_file, self.test_output_dir)
144 call = call + " --collapse-user"
146 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
149 copyfile(self.call_output, test_file)
151 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
152 test = pd.read_table(test_file)
153 baseline = pd.read_table(baseline_file)
154 assert_frame_equal(test,baseline)
156 def test_pwr_segment(self):
157 test_filename = "persistence_segment_" + self.wikiq_out_name
158 test_file = os.path.join(self.test_output_dir, test_filename)
159 if os.path.exists(test_file):
162 call = self.base_call.format(self.input_file, self.test_output_dir)
163 call = call + " --persistence segment"
164 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
168 copyfile(self.call_output, test_file)
170 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
172 test = pd.read_table(test_file)
173 baseline = pd.read_table(baseline_file)
174 assert_frame_equal(test,baseline)
176 def test_pwr_legacy(self):
177 test_filename = "persistence_legacy_" + self.wikiq_out_name
178 test_file = os.path.join(self.test_output_dir, test_filename)
179 if os.path.exists(test_file):
182 call = self.base_call.format(self.input_file, self.test_output_dir)
183 call = call + " --persistence legacy"
184 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
188 copyfile(self.call_output, test_file)
190 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
192 test = pd.read_table(test_file)
193 baseline = pd.read_table(baseline_file)
194 assert_frame_equal(test,baseline)
197 test_filename = "persistence_" + self.wikiq_out_name
198 test_file = os.path.join(self.test_output_dir, test_filename)
199 if os.path.exists(test_file):
202 call = self.base_call.format(self.input_file, self.test_output_dir)
203 call = call + " --persistence"
204 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
208 copyfile(self.call_output, test_file)
210 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
212 test = pd.read_table(test_file)
213 baseline = pd.read_table(baseline_file)
214 assert_frame_equal(test,baseline)
217 def test_url_encode(self):
218 test_filename = "url-encode_" + self.wikiq_out_name
220 test_file = os.path.join(self.test_output_dir, test_filename)
221 if os.path.exists(test_file):
224 call = self.base_call.format(self.input_file, self.test_output_dir)
225 call = call + " --url-encode"
226 proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
229 copyfile(self.call_output, test_file)
230 baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
231 test = pd.read_table(test_file)
232 baseline = pd.read_table(baseline_file)
233 assert_frame_equal(test,baseline)
236 class Test_Malformed(unittest.TestCase):
238 if not os.path.exists("test_output"):
239 os.mkdir("test_output")
241 self.wiki = 'twinpeaks'
242 self.wikiq_out_name = self.wiki + ".tsv"
243 self.test_output_dir = os.path.join(".", "test_output")
244 self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
246 self.infile = "{0}.xml.7z".format(self.wiki)
247 self.base_call = "../wikiq {0} -o {1}"
248 self.input_dir = "dumps"
249 self.input_file = os.path.join(".", self.input_dir,self.infile)
252 def test_malformed_noargs(self):
254 call = self.base_call.format(self.input_file, self.test_output_dir)
255 proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
257 outs, errs = proc.communicate()
258 errlines = str(errs).split("\\n")
259 self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
261 class Test_Stdout(unittest.TestCase):
264 self.wiki = 'sailormoon'
265 self.wikiq_out_name = self.wiki + ".tsv"
267 self.infile = "{0}.xml.7z".format(self.wiki)
268 self.base_call = "../wikiq {0} --stdout"
269 self.input_dir = "dumps"
270 self.input_file = os.path.join(".", self.input_dir,self.infile)
271 self.baseline_output_dir = "baseline_output"
273 def test_noargs(self):
275 call = self.base_call.format(self.input_file)
276 proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
277 outs = proc.stdout.decode("utf8")
279 test_file = "noargs_" + self.wikiq_out_name
280 baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
282 test = pd.read_table(StringIO(outs))
283 baseline = pd.read_table(baseline_file)
284 assert_frame_equal(test,baseline)
286 class Test_Regex(unittest.TestCase):
289 self.wiki = 'regextest'
290 self.wikiq_out_name = self.wiki + '.tsv'
291 self.infile = "{0}.xml.bz2".format(self.wiki)
293 self.input_dir = "dumps"
294 self.input_file = os.path.join(".", self.input_dir,self.infile)
296 if not os.path.exists("test_output"):
297 os.mkdir("test_output")
299 self.test_output_dir = os.path.join(".", "test_output")
300 self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
301 # we have two base calls, one for checking inputs and the other for checking outputs
302 self.base_call = "../wikiq {0}"
303 self.base_call_outs = "../wikiq {0} -o {1}"
305 self.baseline_output_dir = "baseline_output"
307 # sample inputs for checking that bad inputs get terminated / test_regex_inputs
308 self.bad_input1 = "-RP '\\b\\d+\\b'" #label is missing
309 self.bad_input2 = "-RP 'NPO V' -RP THE -RPl testlabel" #number of reg and number of labels do not match
310 self.bad_input3 = "-CP '(Tamil|Li)' -RPl testlabel" #cp but rp label
311 self.bad_input4 = "-CPl testlabel" #regex is missing
312 self.bad_input5 = "-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'"
314 self.bad_inputs_list = [self.bad_input1,self.bad_input2,self.bad_input3,self.bad_input4,self.bad_input5]
316 # sample inputs for checking the outcomes of good inputs / test_basic_regex
317 self.good_input1 = "-RP '\\b\\d{3}\\b' -RPl threedigits"
318 self.good_input2 = "-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word"
319 self.good_input3 = "-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning"
320 self.good_input4 = "-CP 'WP:EVADE' -CPl wp_evade"
322 self.good_inputs_list = [self.good_input1,self.good_input2,self.good_input3, self.good_input4]
324 # and with capture group(s) / test_capturegroup_regex
325 self.cap_input1 = "-RP 'Li Chevalier' -RPl li_cheval -CP '(?P<letter>\\b[a-zA-Z]{3}\\b)|(?P<number>\\b\\d+\\b)|(?P<cat>\\bcat\\b)' -CPl three"
326 self.cap_input2 = "-CP '(?P<a>\\bTestCaseA\\b)|(?P<b>\\bTestCaseB\\b)|(?P<c>\\bTestCaseC\\b)|(?P<d>\\bTestCaseD\\b)' -CPl testcase -RP '(?P<npov>npov|NPOV)|(?P<neutral>neutral point of view)' -RPl npov"
328 self.cap_inputs_list = [self.cap_input1,self.cap_input2]
330 def test_regex_inputs(self):
331 for input in self.bad_inputs_list:
332 call = self.base_call.format(self.input_file)
333 call = call + " --stdout " + input
335 proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
336 stdout,stderr = proc.communicate()
337 #print(proc.returncode)
339 # we want to check that the bad inputs were caught and sys.exit is stopping the code
340 print(stderr.decode("utf-8"))
341 self.assertNotEqual(proc.returncode,0)
343 def test_basic_regex(self):
345 for input in self.good_inputs_list:
347 test_filename = "basic_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
348 #print(test_filename)
349 test_file = os.path.join(self.test_output_dir, test_filename)
350 if os.path.exists(test_file):
353 call = self.base_call_outs.format(self.input_file, self.test_output_dir)
354 call = call + " " + input
357 proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
360 copyfile(self.call_output, test_file)
361 f = open(self.call_output, 'w')
364 # don't have a baseline file to compare a test to??
365 # baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
369 #a proper assert statement still needs to go here, but I checked out the generated files for now and it functions
372 def test_capturegroup_regex(self):
374 for input in self.cap_inputs_list:
375 test_filename = "capturegroup_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
377 test_file = os.path.join(self.test_output_dir, test_filename)
378 if os.path.exists(test_file):
381 call = self.base_call_outs.format(self.input_file, self.test_output_dir)
382 call = call + " " + input
385 proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
388 copyfile(self.call_output, test_file)
389 f = open(self.call_output, 'w')
392 # don't have a baseline file to compare a test to??
393 # baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
397 #a proper assert statement still needs to go here, but I checked out the generated files for now and it functions
399 if __name__ == '__main__':