X-Git-Url: https://code.communitydata.science/mediawiki_dump_tools.git/blobdiff_plain/bf396ad366988d8dfa8afd00cbc49df8454cc611..f69e8b44a67214d8aa424fff33e116f2e1bf567a:/tests/Wikiq_Test.py?ds=sidebyside diff --git a/tests/Wikiq_Test.py b/tests/Wikiq_Test.py new file mode 100644 index 0000000..e43ee7d --- /dev/null +++ b/tests/Wikiq_Test.py @@ -0,0 +1,223 @@ +import unittest +import os +import subprocess +from shutil import copyfile +import pandas as pd +from pandas.util.testing import assert_frame_equal +from io import StringIO + +# with / without pwr DONE +# with / without url encode DONE +# with / without collapse user DONE +# with output to sdtout DONE +# note that the persistence radius is 7 by default +# reading various file formats including +# 7z, gz, bz2, xml DONE +# wikia and wikipedia data DONE +# malformed xmls DONE + +class Test_Wikipedia(unittest.TestCase): + def setUp(self): + if not os.path.exists("test_output"): + os.mkdir("test_output") + + self.wiki = 'ikwiki-20180301-pages-meta-history' + self.wikiq_out_name = self.wiki + ".tsv" + self.test_output_dir = os.path.join(".", "test_output") + self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) + + self.infile = "{0}.xml.bz2".format(self.wiki) + self.base_call = "../wikiq {0} -o {1}" + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) + self.baseline_output_dir = "baseline_output" + + def test_WP_url_encode(self): + test_filename = "url-encode_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --url-encode" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + copyfile(self.call_output, test_file) + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + # as a test let's make sure that we get equal data frames + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + +class Test_Basic(unittest.TestCase): + + def setUp(self): + if not os.path.exists("test_output"): + os.mkdir("test_output") + + self.wiki = 'sailormoon' + self.wikiq_out_name = self.wiki + ".tsv" + self.test_output_dir = os.path.join(".", "test_output") + self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) + + self.infile = "{0}.xml.7z".format(self.wiki) + self.base_call = "../wikiq {0} -o {1}" + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) + self.baseline_output_dir = "baseline_output" + + def test_noargs(self): + + test_filename = "noargs_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + + def test_collapse_user(self): + test_filename = "collapse-user_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --collapse-user" + + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + def test_pwr_legacy(self): + test_filename = "persistence_legacy_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --persistence-legacy" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + def test_pwr(self): + test_filename = "persistence_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --persistence" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + + def test_url_encode(self): + test_filename = "url-encode_" + self.wikiq_out_name + + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --url-encode" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + copyfile(self.call_output, test_file) + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + +class Test_Malformed(unittest.TestCase): + + def setUp(self): + if not os.path.exists("test_output"): + os.mkdir("test_output") + + self.wiki = 'twinpeaks' + self.wikiq_out_name = self.wiki + ".tsv" + self.test_output_dir = os.path.join(".", "test_output") + self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) + + self.infile = "{0}.xml.7z".format(self.wiki) + self.base_call = "../wikiq {0} -o {1}" + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) + + + def test_malformed_noargs(self): + + call = self.base_call.format(self.input_file, self.test_output_dir) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True) + proc.wait() + outs, errs = proc.communicate() + errlines = str(errs).split("\\n") + self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0') + +class Test_Stdout(unittest.TestCase): + + def setUp(self): + self.wiki = 'sailormoon' + self.wikiq_out_name = self.wiki + ".tsv" + + self.infile = "{0}.xml.7z".format(self.wiki) + self.base_call = "../wikiq {0} --stdout" + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) + self.baseline_output_dir = "baseline_output" + + def test_noargs(self): + + call = self.base_call.format(self.input_file) + proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True) + outs = proc.stdout.decode("utf8") + + test_file = "noargs_" + self.wikiq_out_name + baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + print(baseline_file) + test = pd.read_table(StringIO(outs)) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + +if __name__ == '__main__': + unittest.main()