X-Git-Url: https://code.communitydata.science/mediawiki_dump_tools.git/blobdiff_plain/d2746879d09ca64418de41f34a33b11b205a101d..d77b0a4965e59a3e5f84956b27dad58d98110c06:/test/Wikiq_Unit_Test.py diff --git a/test/Wikiq_Unit_Test.py b/test/Wikiq_Unit_Test.py index 3a1e24b..ab0a532 100644 --- a/test/Wikiq_Unit_Test.py +++ b/test/Wikiq_Unit_Test.py @@ -1,17 +1,58 @@ import unittest import os import sys +import subprocess from shutil import copyfile +import pdb # with / without pwr DONE # with / without url encode DONE # with / without collapse user DONE -# with output to sdtout +# with output to sdtout DONE # note that the persistence radius is 7 by default # reading various file formats including -# 7z, gz, bz2, xml -# wikia and wikipedia data -# malformed xmls +# 7z, gz, bz2, xml DONE +# wikia and wikipedia data DONE +# malformed xmls DONE + +class Test_Wikipedia(unittest.TestCase): + def setUp(self): + if not os.path.exists("test_output"): + os.mkdir("test_output") + + self.wiki = 'ikwiki-20180301-pages-meta-history' + self.wikiq_out_name = self.wiki + ".tsv" + self.test_output_dir = os.path.join(".", "test_output") + self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) + + self.infile = "{0}.xml.bz2".format(self.wiki) + self.base_call = "../wikiq {0} -o {1}" + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) + self.baseline_output_dir = "baseline_output" + + def test_WP_url_encode(self): + test_filename = "url-encode_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + + call = self.base_call.format(self.input_file, self.test_output_dir) + call = call + " --url-encode" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + copyfile(self.call_output, test_file) + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + test_lines = open(test_file) + baseline_lines = open(baseline_file) + for test, baseline in zip(test_lines, baseline_lines): + self.assertEqual(test,baseline) + + test_lines.close() + baseline_lines.close() + class Test_Basic(unittest.TestCase): @@ -32,35 +73,44 @@ class Test_Basic(unittest.TestCase): def test_noargs(self): + test_filename = "noargs_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + call = self.base_call.format(self.input_file, self.test_output_dir) - os.system(call) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() - test_file = "noargs_" + self.wikiq_out_name - copyfile(self.call_output, os.path.join(self.test_output_dir, test_file)) + copyfile(self.call_output, test_file) - baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) - test_lines = open(os.path.join(self.test_output_dir,test_file)) + test_lines = open(test_file) baseline_lines = open(baseline_file) for test, baseline in zip(test_lines, baseline_lines): - self.assertEqual(test,baseline) + self.assertEqual(test, baseline) test_lines.close() baseline_lines.close() def test_collapse_user(self): + test_filename = "collapse-user_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + call = self.base_call.format(self.input_file, self.test_output_dir) - os.system(call) call = call + " --collapse-user" - os.system(call) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() - test_file = "collapse-user_" + self.wikiq_out_name - copyfile(self.call_output, os.path.join(self.test_output_dir, test_file)) + copyfile(self.call_output, test_file) - baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) - test_lines = open(os.path.join(self.test_output_dir,test_file)) + test_lines = open(test_file) baseline_lines = open(baseline_file) for test, baseline in zip(test_lines, baseline_lines): self.assertEqual(test,baseline) @@ -68,17 +118,23 @@ class Test_Basic(unittest.TestCase): test_lines.close() baseline_lines.close() - def test_pwr(self): + def test_pwr_legacy(self): + test_filename = "persistence_legacy_" + self.wikiq_out_name + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + call = self.base_call.format(self.input_file, self.test_output_dir) - call = call + " --persistence" - os.system(call) + call = call + " --persistence-legacy" + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() - test_file = "persistence_" + self.wikiq_out_name - copyfile(self.call_output, os.path.join(self.test_output_dir, test_file)) - baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + copyfile(self.call_output, test_file) - test_lines = open(os.path.join(self.test_output_dir,test_file)) + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) + + test_lines = open(test_file) baseline_lines = open(baseline_file) for test, baseline in zip(test_lines, baseline_lines): self.assertEqual(test,baseline) @@ -87,14 +143,21 @@ class Test_Basic(unittest.TestCase): baseline_lines.close() def test_url_encode(self): + test_filename = "url-encode_" + self.wikiq_out_name + + test_file = os.path.join(self.test_output_dir, test_filename) + if os.path.exists(test_file): + os.remove(test_file) + call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --url-encode" - os.system(call) - test_file = "url-encode_" + self.wikiq_out_name - copyfile(self.call_output, os.path.join(self.test_output_dir, test_file)) - baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) + proc.wait() + + copyfile(self.call_output, test_file) + baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) - test_lines = open(os.path.join(self.test_output_dir,test_file)) + test_lines = open(test_file) baseline_lines = open(baseline_file) for test, baseline in zip(test_lines, baseline_lines): self.assertEqual(test,baseline) @@ -102,6 +165,83 @@ class Test_Basic(unittest.TestCase): test_lines.close() baseline_lines.close() +class Test_Malformed(unittest.TestCase): + + def setUp(self): + if not os.path.exists("test_output"): + os.mkdir("test_output") + + self.wiki = 'twinpeaks' + self.wikiq_out_name = self.wiki + ".tsv" + self.test_output_dir = os.path.join(".", "test_output") + self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) + + self.infile = "{0}.xml.7z".format(self.wiki) + self.base_call = "../wikiq {0} -o {1}" + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) + + + def test_malformed_noargs(self): + + call = self.base_call.format(self.input_file, self.test_output_dir) + proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True) + proc.wait() + outs, errs = proc.communicate() + errlines = str(errs).split("\\n") + self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0') + +class Test_Stdout(unittest.TestCase): + + def setUp(self): + self.wiki = 'sailormoon' + self.wikiq_out_name = self.wiki + ".tsv" + + self.infile = "{0}.xml.7z".format(self.wiki) + self.base_call = "../wikiq {0} --stdout" + self.input_dir = "dumps" + self.input_file = os.path.join(".", self.input_dir,self.infile) + self.baseline_output_dir = "baseline_output" + + def test_noargs(self): + + call = self.base_call.format(self.input_file) + proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True) + outs = proc.stdout.decode('utf-8') + + test_file = "noargs_" + self.wikiq_out_name + baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + + test_lines = outs.splitlines(True) + baseline_lines = open(baseline_file) + for test, baseline in zip(test_lines, baseline_lines): + self.assertEqual(test,baseline) + + def test_persistence(self): + + call = self.base_call.format(self.input_file) + " --persistence" + proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True) + outs = proc.stdout.decode('utf-8') + + test_file = "persistence_" + self.wikiq_out_name + baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + + test_lines = outs.splitlines(True) + baseline_lines = open(baseline_file) + for test, baseline in zip(test_lines, baseline_lines): + self.assertEqual(test,baseline) + + # test_file = "noargs_" + self.wikiq_out_name + # copyfile(self.call_output, os.path.join(self.test_output_dir, test_file)) + + # baseline_file = os.path.join(".", self.baseline_output_dir, test_file) + + # test_lines = open(os.path.join(self.test_output_dir,test_file)) + # baseline_lines = open(baseline_file) + # for test, baseline in zip(test_lines, baseline_lines): + # self.assertEqual(test,baseline) + # test_lines.close() + # baseline_lines.close() if __name__ == '__main__':