X-Git-Url: https://code.communitydata.science/mediawiki_dump_tools.git/blobdiff_plain/dba793c6ac595e7a5c0ac795575c28231f06f8cb..097c60a7bccd4a9fcbf59f55049b003bc76bf10e:/test/Wikiq_Unit_Test.py?ds=inline diff --git a/test/Wikiq_Unit_Test.py b/test/Wikiq_Unit_Test.py index e43ee7d..cc27fb5 100644 --- a/test/Wikiq_Unit_Test.py +++ b/test/Wikiq_Unit_Test.py @@ -1,223 +1,399 @@ -import unittest -import os -import subprocess -from shutil import copyfile -import pandas as pd -from pandas.util.testing import assert_frame_equal -from io import StringIO - -# with / without pwr DONE -# with / without url encode DONE -# with / without collapse user DONE -# with output to sdtout DONE -# note that the persistence radius is 7 by default -# reading various file formats including -# 7z, gz, bz2, xml DONE -# wikia and wikipedia data DONE -# malformed xmls DONE - -class Test_Wikipedia(unittest.TestCase): - def setUp(self): - if not os.path.exists("test_output"): - os.mkdir("test_output") - - self.wiki = 'ikwiki-20180301-pages-meta-history' - self.wikiq_out_name = self.wiki + ".tsv" - self.test_output_dir = os.path.join(".", "test_output") - self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) - - self.infile = "{0}.xml.bz2".format(self.wiki) - self.base_call = "../wikiq {0} -o {1}" - self.input_dir = "dumps" - self.input_file = os.path.join(".", self.input_dir,self.infile) - self.baseline_output_dir = "baseline_output" - - def test_WP_url_encode(self): - test_filename = "url-encode_" + self.wikiq_out_name - test_file = os.path.join(self.test_output_dir, test_filename) - if os.path.exists(test_file): - os.remove(test_file) - - call = self.base_call.format(self.input_file, self.test_output_dir) - call = call + " --url-encode" - proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) - proc.wait() - - copyfile(self.call_output, test_file) - baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) - - # as a test let's make sure that we get equal data frames - test = pd.read_table(test_file) - baseline = pd.read_table(baseline_file) - assert_frame_equal(test,baseline) - - -class Test_Basic(unittest.TestCase): - - def setUp(self): - if not os.path.exists("test_output"): - os.mkdir("test_output") - - self.wiki = 'sailormoon' - self.wikiq_out_name = self.wiki + ".tsv" - self.test_output_dir = os.path.join(".", "test_output") - self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) - - self.infile = "{0}.xml.7z".format(self.wiki) - self.base_call = "../wikiq {0} -o {1}" - self.input_dir = "dumps" - self.input_file = os.path.join(".", self.input_dir,self.infile) - self.baseline_output_dir = "baseline_output" - - def test_noargs(self): - - test_filename = "noargs_" + self.wikiq_out_name - test_file = os.path.join(self.test_output_dir, test_filename) - if os.path.exists(test_file): - os.remove(test_file) - - call = self.base_call.format(self.input_file, self.test_output_dir) - proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) - proc.wait() - - copyfile(self.call_output, test_file) - - baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) - - test = pd.read_table(test_file) - baseline = pd.read_table(baseline_file) - assert_frame_equal(test,baseline) - - - def test_collapse_user(self): - test_filename = "collapse-user_" + self.wikiq_out_name - test_file = os.path.join(self.test_output_dir, test_filename) - if os.path.exists(test_file): - os.remove(test_file) - - call = self.base_call.format(self.input_file, self.test_output_dir) - call = call + " --collapse-user" - - proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) - proc.wait() - - copyfile(self.call_output, test_file) - - baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) - test = pd.read_table(test_file) - baseline = pd.read_table(baseline_file) - assert_frame_equal(test,baseline) - - def test_pwr_legacy(self): - test_filename = "persistence_legacy_" + self.wikiq_out_name - test_file = os.path.join(self.test_output_dir, test_filename) - if os.path.exists(test_file): - os.remove(test_file) - - call = self.base_call.format(self.input_file, self.test_output_dir) - call = call + " --persistence-legacy" - proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) - proc.wait() - - - copyfile(self.call_output, test_file) - - baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) - - test = pd.read_table(test_file) - baseline = pd.read_table(baseline_file) - assert_frame_equal(test,baseline) - - def test_pwr(self): - test_filename = "persistence_" + self.wikiq_out_name - test_file = os.path.join(self.test_output_dir, test_filename) - if os.path.exists(test_file): - os.remove(test_file) - - call = self.base_call.format(self.input_file, self.test_output_dir) - call = call + " --persistence" - proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) - proc.wait() - - - copyfile(self.call_output, test_file) - - baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) - - test = pd.read_table(test_file) - baseline = pd.read_table(baseline_file) - assert_frame_equal(test,baseline) - - - def test_url_encode(self): - test_filename = "url-encode_" + self.wikiq_out_name - - test_file = os.path.join(self.test_output_dir, test_filename) - if os.path.exists(test_file): - os.remove(test_file) - - call = self.base_call.format(self.input_file, self.test_output_dir) - call = call + " --url-encode" - proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) - proc.wait() - - copyfile(self.call_output, test_file) - baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) - test = pd.read_table(test_file) - baseline = pd.read_table(baseline_file) - assert_frame_equal(test,baseline) - - -class Test_Malformed(unittest.TestCase): - - def setUp(self): - if not os.path.exists("test_output"): - os.mkdir("test_output") - - self.wiki = 'twinpeaks' - self.wikiq_out_name = self.wiki + ".tsv" - self.test_output_dir = os.path.join(".", "test_output") - self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) - - self.infile = "{0}.xml.7z".format(self.wiki) - self.base_call = "../wikiq {0} -o {1}" - self.input_dir = "dumps" - self.input_file = os.path.join(".", self.input_dir,self.infile) - - - def test_malformed_noargs(self): - - call = self.base_call.format(self.input_file, self.test_output_dir) - proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True) - proc.wait() - outs, errs = proc.communicate() - errlines = str(errs).split("\\n") - self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0') - -class Test_Stdout(unittest.TestCase): - - def setUp(self): - self.wiki = 'sailormoon' - self.wikiq_out_name = self.wiki + ".tsv" - - self.infile = "{0}.xml.7z".format(self.wiki) - self.base_call = "../wikiq {0} --stdout" - self.input_dir = "dumps" - self.input_file = os.path.join(".", self.input_dir,self.infile) - self.baseline_output_dir = "baseline_output" - - def test_noargs(self): - - call = self.base_call.format(self.input_file) - proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True) - outs = proc.stdout.decode("utf8") - - test_file = "noargs_" + self.wikiq_out_name - baseline_file = os.path.join(".", self.baseline_output_dir, test_file) - print(baseline_file) - test = pd.read_table(StringIO(outs)) - baseline = pd.read_table(baseline_file) - assert_frame_equal(test,baseline) - -if __name__ == '__main__': - unittest.main() +import unittest +import os +import subprocess +from shutil import copyfile +import pandas as pd +from pandas.util.testing import assert_frame_equal +from io import StringIO + +# with / without pwr DONE +# with / without url encode DONE +# with / without collapse user DONE +# with output to sdtout DONE +# note that the persistence radius is 7 by default +# reading various file formats including +# 7z, gz, bz2, xml DONE +# wikia and wikipedia data DONE +# malformed xmls DONE + +class Test_Wikipedia(unittest.TestCase): + def setUp(self): + if not os.path.exists("test_output"): + os.mkdir("test_output") + + self.wiki = 'ikwiki-20180301-pages-meta-history' + self.wikiq_out_name = self.wiki + ".tsv" + self.test_output_dir = os.path.join(".", "test_output") + self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) + + self.infile = "{0}.xml.bz2".format(self.wiki) + self.base_call = "../wikiq {0} -o {1}" + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) + self.baseline_output_dir = "baseline_output" + + def test_WP_url_encode(self): + test_filename = "url-encode_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --url-encode" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + copyfile(self.call_output, test_file) + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + # as a test let's make sure that we get equal data frames + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + def test_WP_namespaces(self): + print(os.path.abspath('.')) + test_filename = "namespaces_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " -n 0 -n 1" + print(call) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + copyfile(self.call_output, test_file) + baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename) + + # as a test let's make sure that we get equal data frames + test = pd.read_table(test_file) + num_wrong_ns = sum(~ test.namespace.isin({0,1})) + self.assertEqual(num_wrong_ns, 0) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + def test_WP_revert_radius(self): + print(os.path.abspath('.')) + test_filename = "revert_radius_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " -n 0 -n 1 -rr 1" + print(call) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + copyfile(self.call_output, test_file) + baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename) + + # as a test let's make sure that we get equal data frames + test = pd.read_table(test_file) + num_wrong_ns = sum(~ test.namespace.isin({0,1})) + self.assertEqual(num_wrong_ns, 0) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + + +class Test_Basic(unittest.TestCase): + + def setUp(self): + if not os.path.exists("test_output"): + os.mkdir("test_output") + + self.wiki = 'sailormoon' + self.wikiq_out_name = self.wiki + ".tsv" + self.test_output_dir = os.path.join(".", "test_output") + self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) + + self.infile = "{0}.xml.7z".format(self.wiki) + self.base_call = "../wikiq {0} -o {1}" + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) + self.baseline_output_dir = "baseline_output" + + def test_noargs(self): + + test_filename = "noargs_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + + def test_collapse_user(self): + test_filename = "collapse-user_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --collapse-user" + + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + def test_pwr_segment(self): + test_filename = "persistence_segment_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --persistence segment" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + def test_pwr_legacy(self): + test_filename = "persistence_legacy_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --persistence legacy" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + def test_pwr(self): + test_filename = "persistence_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --persistence" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + + def test_url_encode(self): + test_filename = "url-encode_" + self.wikiq_out_name + + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --url-encode" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + copyfile(self.call_output, test_file) + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + +class Test_Malformed(unittest.TestCase): + def setUp(self): + if not os.path.exists("test_output"): + os.mkdir("test_output") + + self.wiki = 'twinpeaks' + self.wikiq_out_name = self.wiki + ".tsv" + self.test_output_dir = os.path.join(".", "test_output") + self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) + + self.infile = "{0}.xml.7z".format(self.wiki) + self.base_call = "../wikiq {0} -o {1}" + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) + + + def test_malformed_noargs(self): + + call = self.base_call.format(self.input_file, self.test_output_dir) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True) + proc.wait() + outs, errs = proc.communicate() + errlines = str(errs).split("\\n") + self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0') + +class Test_Stdout(unittest.TestCase): + + def setUp(self): + self.wiki = 'sailormoon' + self.wikiq_out_name = self.wiki + ".tsv" + + self.infile = "{0}.xml.7z".format(self.wiki) + self.base_call = "../wikiq {0} --stdout" + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) + self.baseline_output_dir = "baseline_output" + + def test_noargs(self): + + call = self.base_call.format(self.input_file) + proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True) + outs = proc.stdout.decode("utf8") + + test_file = "noargs_" + self.wikiq_out_name + baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + print(baseline_file) + test = pd.read_table(StringIO(outs)) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + +class Test_Regex(unittest.TestCase): + + def setUp(self): + self.wiki = 'emptytext' + self.wikiq_out_name = self.wiki + '.tsv' + self.infile = "{0}.xml.bz2".format(self.wiki) + + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) + + if not os.path.exists("test_output"): + os.mkdir("test_output") + + self.test_output_dir = os.path.join(".", "test_output") + self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) + # we have two base calls, one for checking inputs and the other for checking outputs + self.base_call = "../wikiq {0}" + self.base_call_outs = "../wikiq {0} -o {1}" + + self.baseline_output_dir = "baseline_output" + + # sample inputs for checking that bad inputs get terminated / test_regex_inputs + self.bad_inputs_list = [ + #label is missing + "-RP '\\b\\d+\\b'", + #number of reg and number of labels do not match + "-RP 'NPO V' -RP THE -RPl testlabel", + #cp but rp label + "-CP '(Tamil|Li)' -RPl testlabel", + #regex is missing + "-CPl testlabel", + "-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'" + ] + + # sample inputs for checking the outcomes of good inputs / test_basic_regex + self.good_inputs_list = [ + "-RP '\\b\\d{3}\\b' -RPl threedigits", + "-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word", + "-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning", + "-CP 'WP:EVADE' -CPl wp_evade" + ] + + + self.cap_inputs_list = [ + "-RP 'Li Chevalier' -RPl li_cheval -CP '(?P\\b[a-zA-Z]{3}\\b)|(?P\\b\\d+\\b)|(?P\\bcat\\b)' -CPl three", + "-CP '(?P\\bTestCaseA\\b)|(?P\\bTestCaseB\\b)|(?P\\bTestCaseC\\b)|(?P\\bTestCaseD\\b)' -CPl testcase -RP '(?Pnpov|NPOV)|(?Pneutral point of view)' -RPl npov" + ] + + + + def test_regex_inputs(self): + for input in self.bad_inputs_list: + call = self.base_call.format(self.input_file) + call = call + " --stdout " + input + #print(call) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True) + stdout,stderr = proc.communicate() + #print(proc.returncode) + + # we want to check that the bad inputs were caught and sys.exit is stopping the code + #print(stderr.decode("utf-8")) + self.assertNotEqual(proc.returncode,0) + + def test_basic_regex(self): + for i, input in enumerate(self.good_inputs_list): + + test_filename = "basic_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i)) + #print(test_filename) + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call_outs.format(self.input_file, self.test_output_dir) + call = call + " " + input + #print(call) + + proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True) + proc.wait() + copyfile(self.call_output, test_file) + + test = pd.read_table(test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + baseline = pd.read_table(baseline_file) + #assert_frame_equal(test, baseline) + #print(i) + + + def test_capturegroup_regex(self): + for i, input in enumerate(self.cap_inputs_list): + test_filename = "capturegroup_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i)) + #print(test_filename) + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call_outs.format(self.input_file, self.test_output_dir) + call = call + " " + input + #print(call) + + proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True) + proc.wait() + + copyfile(self.call_output, test_file) + + test = pd.read_table(test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + baseline = pd.read_table(baseline_file) + #assert_frame_equal(test, baseline) + + +if __name__ == '__main__': + unittest.main()