X-Git-Url: https://code.communitydata.science/mediawiki_dump_tools.git/blobdiff_plain/d2746879d09ca64418de41f34a33b11b205a101d..776b73519ac8af42b64875a5cc27125ceff7c861:/test/Wikiq_Unit_Test.py diff --git a/test/Wikiq_Unit_Test.py b/test/Wikiq_Unit_Test.py index 3a1e24b..0826f1f 100644 --- a/test/Wikiq_Unit_Test.py +++ b/test/Wikiq_Unit_Test.py @@ -1,17 +1,56 @@ import unittest import os -import sys +import subprocess from shutil import copyfile +import pandas as pd +from pandas.util.testing import assert_frame_equal +from io import StringIO # with / without pwr DONE # with / without url encode DONE # with / without collapse user DONE -# with output to sdtout +# with output to sdtout DONE # note that the persistence radius is 7 by default # reading various file formats including -# 7z, gz, bz2, xml -# wikia and wikipedia data -# malformed xmls +# 7z, gz, bz2, xml DONE +# wikia and wikipedia data DONE +# malformed xmls DONE + +class Test_Wikipedia(unittest.TestCase): + def setUp(self): + if not os.path.exists("test_output"): + os.mkdir("test_output") + + self.wiki = 'ikwiki-20180301-pages-meta-history' + self.wikiq_out_name = self.wiki + ".tsv" + self.test_output_dir = os.path.join(".", "test_output") + self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) + + self.infile = "{0}.xml.bz2".format(self.wiki) + self.base_call = "../wikiq {0} -o {1}" + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) + self.baseline_output_dir = "baseline_output" + + def test_WP_url_encode(self): + test_filename = "url-encode_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --url-encode" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + copyfile(self.call_output, test_file) + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + # as a test let's make sure that we get equal data frames + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + class Test_Basic(unittest.TestCase): @@ -32,77 +71,173 @@ class Test_Basic(unittest.TestCase): def test_noargs(self): + test_filename = "noargs_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + call = self.base_call.format(self.input_file, self.test_output_dir) - os.system(call) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() - test_file = "noargs_" + self.wikiq_out_name - copyfile(self.call_output, os.path.join(self.test_output_dir, test_file)) + copyfile(self.call_output, test_file) - baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) - test_lines = open(os.path.join(self.test_output_dir,test_file)) - baseline_lines = open(baseline_file) - for test, baseline in zip(test_lines, baseline_lines): - self.assertEqual(test,baseline) + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) - test_lines.close() - baseline_lines.close() def test_collapse_user(self): + test_filename = "collapse-user_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + call = self.base_call.format(self.input_file, self.test_output_dir) - os.system(call) call = call + " --collapse-user" - os.system(call) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() - test_file = "collapse-user_" + self.wikiq_out_name - copyfile(self.call_output, os.path.join(self.test_output_dir, test_file)) + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + def test_pwr_segment(self): + test_filename = "persistence_segment_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --persistence segment" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() - baseline_file = os.path.join(".", self.baseline_output_dir, test_file) - test_lines = open(os.path.join(self.test_output_dir,test_file)) - baseline_lines = open(baseline_file) - for test, baseline in zip(test_lines, baseline_lines): - self.assertEqual(test,baseline) + copyfile(self.call_output, test_file) - test_lines.close() - baseline_lines.close() + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + def test_pwr_legacy(self): + test_filename = "persistence_legacy_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --persistence legacy" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) def test_pwr(self): + test_filename = "persistence_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --persistence" - os.system(call) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() - test_file = "persistence_" + self.wikiq_out_name - copyfile(self.call_output, os.path.join(self.test_output_dir, test_file)) - baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + copyfile(self.call_output, test_file) + + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) - test_lines = open(os.path.join(self.test_output_dir,test_file)) - baseline_lines = open(baseline_file) - for test, baseline in zip(test_lines, baseline_lines): - self.assertEqual(test,baseline) + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) - test_lines.close() - baseline_lines.close() def test_url_encode(self): + test_filename = "url-encode_" + self.wikiq_out_name + + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --url-encode" - os.system(call) - test_file = "url-encode_" + self.wikiq_out_name - copyfile(self.call_output, os.path.join(self.test_output_dir, test_file)) - baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + copyfile(self.call_output, test_file) + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + test = pd.read_table(test_file) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) + + +class Test_Malformed(unittest.TestCase): + + def setUp(self): + if not os.path.exists("test_output"): + os.mkdir("test_output") + + self.wiki = 'twinpeaks' + self.wikiq_out_name = self.wiki + ".tsv" + self.test_output_dir = os.path.join(".", "test_output") + self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) + + self.infile = "{0}.xml.7z".format(self.wiki) + self.base_call = "../wikiq {0} -o {1}" + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) + + + def test_malformed_noargs(self): + + call = self.base_call.format(self.input_file, self.test_output_dir) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True) + proc.wait() + outs, errs = proc.communicate() + errlines = str(errs).split("\\n") + self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0') + +class Test_Stdout(unittest.TestCase): + + def setUp(self): + self.wiki = 'sailormoon' + self.wikiq_out_name = self.wiki + ".tsv" - test_lines = open(os.path.join(self.test_output_dir,test_file)) - baseline_lines = open(baseline_file) - for test, baseline in zip(test_lines, baseline_lines): - self.assertEqual(test,baseline) + self.infile = "{0}.xml.7z".format(self.wiki) + self.base_call = "../wikiq {0} --stdout" + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) + self.baseline_output_dir = "baseline_output" - test_lines.close() - baseline_lines.close() + def test_noargs(self): + call = self.base_call.format(self.input_file) + proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True) + outs = proc.stdout.decode("utf8") + test_file = "noargs_" + self.wikiq_out_name + baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + print(baseline_file) + test = pd.read_table(StringIO(outs)) + baseline = pd.read_table(baseline_file) + assert_frame_equal(test,baseline) if __name__ == '__main__': unittest.main()