X-Git-Url: https://code.communitydata.science/mediawiki_dump_tools.git/blobdiff_plain/e925ac9da10b20ab4ef949263887a1f9158e805f..414cc5ff2d86c007b1a00c829c779627ea3b18d7:/test/Wikiq_Unit_Test.py diff --git a/test/Wikiq_Unit_Test.py b/test/Wikiq_Unit_Test.py index b32d6a9..14d38f1 100644 --- a/test/Wikiq_Unit_Test.py +++ b/test/Wikiq_Unit_Test.py @@ -1,9 +1,10 @@ import unittest import os -import sys import subprocess from shutil import copyfile -import pdb +import pandas as pd +from pandas.util.testing import assert_frame_equal +from io import StringIO # with / without pwr DONE # with / without url encode DONE @@ -32,21 +33,68 @@ class Test_Wikipedia(unittest.TestCase): self.baseline_output_dir = "baseline_output" def test_WP_url_encode(self): + test_filename = "url-encode_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --url-encode" proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() - test_file = "url-encode_" + self.wikiq_out_name - copyfile(self.call_output, os.path.join(self.test_output_dir, test_file)) - baseline_file = os.path.join(".", self.baseline_output_dir, test_file) - test_lines = open(os.path.join(self.test_output_dir,test_file)) - baseline_lines = open(baseline_file) - for test, baseline in zip(test_lines, baseline_lines): - self.assertEqual(test,baseline) + copyfile(self.call_output, test_file) + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + # as a test let's make sure that we get equal data frames + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + def test_WP_namespaces(self): + print(os.path.abspath('.')) + test_filename = "namespaces_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " -n 0 -n 1" + print(call) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + copyfile(self.call_output, test_file) + baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename) + + # as a test let's make sure that we get equal data frames + test = pd.read_table(test_file) + num_wrong_ns = sum(~ test.namespace.isin({0,1})) + self.assertEqual(num_wrong_ns, 0) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + def test_WP_revert_radius(self): + print(os.path.abspath('.')) + test_filename = "revert_radius_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " -n 0 -n 1 -rr 1" + print(call) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + copyfile(self.call_output, test_file) + baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename) + + # as a test let's make sure that we get equal data frames + test = pd.read_table(test_file) + num_wrong_ns = sum(~ test.namespace.isin({0,1})) + self.assertEqual(num_wrong_ns, 0) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) - test_lines.close() - baseline_lines.close() class Test_Basic(unittest.TestCase): @@ -68,80 +116,124 @@ class Test_Basic(unittest.TestCase): def test_noargs(self): + test_filename = "noargs_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + call = self.base_call.format(self.input_file, self.test_output_dir) proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() - test_file = "noargs_" + self.wikiq_out_name - copyfile(self.call_output, os.path.join(self.test_output_dir, test_file)) - baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) - test_lines = open(os.path.join(self.test_output_dir,test_file)) - baseline_lines = open(baseline_file) - for test, baseline in zip(test_lines, baseline_lines): - self.assertEqual(test,baseline) + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) - test_lines.close() - baseline_lines.close() def test_collapse_user(self): + test_filename = "collapse-user_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --collapse-user" proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() - test_file = "collapse-user_" + self.wikiq_out_name - copyfile(self.call_output, os.path.join(self.test_output_dir, test_file)) + copyfile(self.call_output, test_file) - baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + def test_pwr_segment(self): + test_filename = "persistence_segment_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --persistence segment" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + def test_pwr_legacy(self): + test_filename = "persistence_legacy_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --persistence legacy" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() - test_lines = open(os.path.join(self.test_output_dir,test_file)) - baseline_lines = open(baseline_file) - for test, baseline in zip(test_lines, baseline_lines): - self.assertEqual(test,baseline) - test_lines.close() - baseline_lines.close() + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) def test_pwr(self): + test_filename = "persistence_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --persistence" proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() - test_file = "persistence_" + self.wikiq_out_name - copyfile(self.call_output, os.path.join(self.test_output_dir, test_file)) - baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) - test_lines = open(os.path.join(self.test_output_dir,test_file)) - baseline_lines = open(baseline_file) - for test, baseline in zip(test_lines, baseline_lines): - self.assertEqual(test,baseline) + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) - test_lines.close() - baseline_lines.close() def test_url_encode(self): + test_filename = "url-encode_" + self.wikiq_out_name + + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --url-encode" proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() - test_file = "url-encode_" + self.wikiq_out_name - copyfile(self.call_output, os.path.join(self.test_output_dir, test_file)) - baseline_file = os.path.join(".", self.baseline_output_dir, test_file) - test_lines = open(os.path.join(self.test_output_dir,test_file)) - baseline_lines = open(baseline_file) - for test, baseline in zip(test_lines, baseline_lines): - self.assertEqual(test,baseline) + copyfile(self.call_output, test_file) + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) - test_lines.close() - baseline_lines.close() class Test_Malformed(unittest.TestCase): - def setUp(self): if not os.path.exists("test_output"): os.mkdir("test_output") @@ -182,28 +274,126 @@ class Test_Stdout(unittest.TestCase): call = self.base_call.format(self.input_file) proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True) - outs = proc.stdout.decode('utf-8') - + outs = proc.stdout.decode("utf8") + test_file = "noargs_" + self.wikiq_out_name baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + print(baseline_file) + test = pd.read_table(StringIO(outs)) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + +class Test_Regex(unittest.TestCase): + + def setUp(self): + self.wiki = 'regextest' + self.wikiq_out_name = self.wiki + '.tsv' + self.infile = "{0}.xml.bz2".format(self.wiki) + + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) - test_lines = outs.splitlines(True) - baseline_lines = open(baseline_file) - for test, baseline in zip(test_lines, baseline_lines): - self.assertEqual(test,baseline) + if not os.path.exists("test_output"): + os.mkdir("test_output") - # test_file = "noargs_" + self.wikiq_out_name - # copyfile(self.call_output, os.path.join(self.test_output_dir, test_file)) + self.test_output_dir = os.path.join(".", "test_output") + self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) + # we have two base calls, one for checking inputs and the other for checking outputs + self.base_call = "../wikiq {0}" + self.base_call_outs = "../wikiq {0} -o {1}" - # baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + self.baseline_output_dir = "baseline_output" - # test_lines = open(os.path.join(self.test_output_dir,test_file)) - # baseline_lines = open(baseline_file) - # for test, baseline in zip(test_lines, baseline_lines): - # self.assertEqual(test,baseline) - # test_lines.close() - # baseline_lines.close() + # sample inputs for checking that bad inputs get terminated / test_regex_inputs + self.bad_inputs_list = [ + #label is missing + "-RP '\\b\\d+\\b'", + #number of reg and number of labels do not match + "-RP 'NPO V' -RP THE -RPl testlabel", + #cp but rp label + "-CP '(Tamil|Li)' -RPl testlabel", + #regex is missing + "-CPl testlabel", + "-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'" + ] + + # sample inputs for checking the outcomes of good inputs / test_basic_regex + self.good_inputs_list = [ + "-RP '\\b\\d{3}\\b' -RPl threedigits", + "-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word", + "-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning", + "-CP 'WP:EVADE' -CPl wp_evade" + ] + self.cap_inputs_list = [ + "-RP 'Li Chevalier' -RPl li_cheval -CP '(?P\\b[a-zA-Z]{3}\\b)|(?P\\b\\d+\\b)|(?P\\bcat\\b)' -CPl three", + "-CP '(?P\\bTestCaseA\\b)|(?P\\bTestCaseB\\b)|(?P\\bTestCaseC\\b)|(?P\\bTestCaseD\\b)' -CPl testcase -RP '(?Pnpov|NPOV)|(?Pneutral point of view)' -RPl npov" + ] + + + + def test_regex_inputs(self): + for input in self.bad_inputs_list: + call = self.base_call.format(self.input_file) + call = call + " --stdout " + input + print(call) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True) + stdout,stderr = proc.communicate() + #print(proc.returncode) + + # we want to check that the bad inputs were caught and sys.exit is stopping the code + print(stderr.decode("utf-8")) + self.assertNotEqual(proc.returncode,0) + + def test_basic_regex(self): + for i, input in enumerate(self.good_inputs_list): + + test_filename = "basic_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i)) + #print(test_filename) + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call_outs.format(self.input_file, self.test_output_dir) + call = call + " " + input + print(call) + + proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True) + proc.wait() + copyfile(self.call_output, test_file) + + test = pd.read_table(test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test, baseline) + print(i) + + + def test_capturegroup_regex(self): + for i, input in enumerate(self.cap_inputs_list): + test_filename = "capturegroup_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i)) + print(test_filename) + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call_outs.format(self.input_file, self.test_output_dir) + call = call + " " + input + print(call) + + proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True) + proc.wait() + + copyfile(self.call_output, test_file) + + test = pd.read_table(test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test, baseline) + + if __name__ == '__main__': unittest.main()