import unittest import os import subprocess from shutil import copyfile import pandas as pd from pandas.util.testing import assert_frame_equal from io import StringIO # with / without pwr DONE # with / without url encode DONE # with / without collapse user DONE # with output to sdtout DONE # note that the persistence radius is 7 by default # reading various file formats including # 7z, gz, bz2, xml DONE # wikia and wikipedia data DONE # malformed xmls DONE class Test_Persistence(unittest.TestCase): def setUp(self): if not os.path.exists("test_output"): os.mkdir("test_output") self.wiki = 'pwr-test' self.wikiq_out_name = self.wiki + ".tsv" self.test_output_dir = os.path.join(".", "test_output") self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) self.infile = "{0}.xml".format(self.wiki) self.base_call = "../wikiq {0} -o {1}" self.input_dir = "dumps" self.input_file = os.path.join(".", self.input_dir,self.infile) self.baseline_output_dir = "baseline_output" def test_sequence_persistence(self): test_filename = "sequence-" + self.wikiq_out_name test_file = os.path.join(self.test_output_dir, test_filename) if os.path.exists(test_file): os.remove(test_file) call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --url-encode --persistence sequence" print(os.path.abspath('.')) print(call) proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() copyfile(self.call_output, test_file) baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) # as a test let's make sure that we get equal data frames test = pd.read_table(test_file) self.assertEqual(test['tokens_added'][0],7) self.assertEqual(test['tokens_added'][1],10) self.assertEqual(test['tokens_added'][2],0) self.assertEqual(test['tokens_added'][3],8) self.assertEqual(test['tokens_added'][4],0) self.assertEqual(test['tokens_removed'][0],0) self.assertEqual(test['tokens_removed'][1],0) self.assertEqual(test['tokens_removed'][2],0) self.assertEqual(test['tokens_removed'][3],4) self.assertEqual(test['tokens_removed'][4],0) self.assertEqual(test['token_revs'][0],8*3) self.assertEqual(test['token_revs'][1],0) self.assertEqual(test['token_revs'][2],0) self.assertEqual(test['token_revs'][3],0) self.assertEqual(test['token_revs'][4],0) baseline = pd.read_table(baseline_file) assert_frame_equal(test,baseline) def test_legacy_persistence(self): test_filename = "legacy-" + self.wikiq_out_name test_file = os.path.join(self.test_output_dir, test_filename) if os.path.exists(test_file): os.remove(test_file) call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --url-encode --persistence legacy" print(os.path.abspath('.')) print(call) proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() copyfile(self.call_output, test_file) baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) # as a test let's make sure that we get equal data frames test = pd.read_table(test_file) self.assertEqual(test['tokens_added'][0],7) self.assertEqual(test['tokens_added'][1],10) self.assertEqual(test['tokens_added'][2],0) self.assertEqual(test['tokens_added'][3],11) self.assertEqual(test['tokens_added'][4],0) self.assertEqual(test['tokens_removed'][0],0) self.assertEqual(test['tokens_removed'][1],0) self.assertEqual(test['tokens_removed'][2],0) self.assertEqual(test['tokens_removed'][3],7) self.assertEqual(test['tokens_removed'][4],0) self.assertEqual(test['token_revs'][0],7*3) self.assertEqual(test['token_revs'][1],0) self.assertEqual(test['token_revs'][2],0) self.assertEqual(test['token_revs'][3],0) self.assertEqual(test['token_revs'][4],0) baseline = pd.read_table(baseline_file) assert_frame_equal(test,baseline) class Test_Persistence_Bug(unittest.TestCase): def setUp(self): if not os.path.exists("test_output"): os.mkdir("test_output") self.wiki = 'enwiki-test' self.wikiq_out_name = self.wiki + ".tsv" self.test_output_dir = os.path.join(".", "test_output") self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) self.infile = "{0}.xml".format(self.wiki) self.base_call = "../wikiq {0} -o {1}" self.input_dir = "dumps" self.input_file = os.path.join(".", self.input_dir,self.infile) self.baseline_output_dir = "baseline_output" def test_sequence_persistence(self): test_filename = "sequence-" + self.wikiq_out_name test_file = os.path.join(self.test_output_dir, test_filename) if os.path.exists(test_file): os.remove(test_file) call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --url-encode --persistence sequence --collapse-user" print(os.path.abspath('.')) print(call) proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() copyfile(self.call_output, test_file) baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) # as a test let's make sure that we get equal data frames test = pd.read_table(test_file) baseline = pd.read_table(baseline_file) assert_frame_equal(test,baseline) class Test_Wikipedia(unittest.TestCase): def setUp(self): if not os.path.exists("test_output"): os.mkdir("test_output") self.wiki = 'ikwiki-20180301-pages-meta-history' self.wikiq_out_name = self.wiki + ".tsv" self.test_output_dir = os.path.join(".", "test_output") self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) self.infile = "{0}.xml.bz2".format(self.wiki) self.base_call = "../wikiq {0} -o {1}" self.input_dir = "dumps" self.input_file = os.path.join(".", self.input_dir,self.infile) self.baseline_output_dir = "baseline_output" def test_WP_url_encode(self): test_filename = "url-encode_" + self.wikiq_out_name test_file = os.path.join(self.test_output_dir, test_filename) if os.path.exists(test_file): os.remove(test_file) call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --url-encode" print(call) proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() copyfile(self.call_output, test_file) baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) # as a test let's make sure that we get equal data frames test = pd.read_table(test_file) baseline = pd.read_table(baseline_file) assert_frame_equal(test,baseline) def test_WP_namespaces(self): print(os.path.abspath('.')) test_filename = "namespaces_" + self.wikiq_out_name test_file = os.path.join(self.test_output_dir, test_filename) if os.path.exists(test_file): os.remove(test_file) call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " -n 0 -n 1" print(call) proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() copyfile(self.call_output, test_file) baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename) # as a test let's make sure that we get equal data frames test = pd.read_table(test_file) num_wrong_ns = sum(~ test.namespace.isin({0,1})) self.assertEqual(num_wrong_ns, 0) baseline = pd.read_table(baseline_file) assert_frame_equal(test,baseline) class Test_Basic(unittest.TestCase): def setUp(self): if not os.path.exists("test_output"): os.mkdir("test_output") self.wiki = 'sailormoon' self.wikiq_out_name = self.wiki + ".tsv" self.test_output_dir = os.path.join(".", "test_output") self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) self.infile = "{0}.xml.7z".format(self.wiki) self.base_call = "../wikiq {0} -o {1}" self.input_dir = "dumps" self.input_file = os.path.join(".", self.input_dir,self.infile) self.baseline_output_dir = "baseline_output" def test_noargs(self): test_filename = "noargs_" + self.wikiq_out_name test_file = os.path.join(self.test_output_dir, test_filename) if os.path.exists(test_file): os.remove(test_file) call = self.base_call.format(self.input_file, self.test_output_dir) print(call) proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() copyfile(self.call_output, test_file) baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) test = pd.read_table(test_file) baseline = pd.read_table(baseline_file) assert_frame_equal(test,baseline) def test_collapse_user(self): test_filename = "collapse-user_" + self.wikiq_out_name test_file = os.path.join(self.test_output_dir, test_filename) if os.path.exists(test_file): os.remove(test_file) call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --collapse-user" print(call) proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() copyfile(self.call_output, test_file) baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) test = pd.read_table(test_file) baseline = pd.read_table(baseline_file) assert_frame_equal(test,baseline) def test_pwr_segment(self): test_filename = "persistence_segment_" + self.wikiq_out_name test_file = os.path.join(self.test_output_dir, test_filename) if os.path.exists(test_file): os.remove(test_file) call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --persistence segment" print(call) proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() copyfile(self.call_output, test_file) baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) test = pd.read_table(test_file) print(test) baseline = pd.read_table(baseline_file) assert_frame_equal(test,baseline) def test_pwr_legacy(self): test_filename = "persistence_legacy_" + self.wikiq_out_name test_file = os.path.join(self.test_output_dir, test_filename) if os.path.exists(test_file): os.remove(test_file) call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --persistence legacy" print(call) proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() copyfile(self.call_output, test_file) baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) test = pd.read_table(test_file) baseline = pd.read_table(baseline_file) assert_frame_equal(test,baseline) def test_pwr(self): test_filename = "persistence_" + self.wikiq_out_name test_file = os.path.join(self.test_output_dir, test_filename) if os.path.exists(test_file): os.remove(test_file) call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --persistence" print(call) proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() copyfile(self.call_output, test_file) baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) test = pd.read_table(test_file) baseline = pd.read_table(baseline_file) assert_frame_equal(test,baseline) def test_url_encode(self): test_filename = "url-encode_" + self.wikiq_out_name test_file = os.path.join(self.test_output_dir, test_filename) if os.path.exists(test_file): os.remove(test_file) call = self.base_call.format(self.input_file, self.test_output_dir) call = call + " --url-encode" print(call) proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True) proc.wait() copyfile(self.call_output, test_file) baseline_file = os.path.join(".", self.baseline_output_dir, test_filename) test = pd.read_table(test_file) baseline = pd.read_table(baseline_file) assert_frame_equal(test,baseline) class Test_Malformed(unittest.TestCase): def setUp(self): if not os.path.exists("test_output"): os.mkdir("test_output") self.wiki = 'twinpeaks' self.wikiq_out_name = self.wiki + ".tsv" self.test_output_dir = os.path.join(".", "test_output") self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) self.infile = "{0}.xml.7z".format(self.wiki) self.base_call = "../wikiq {0} -o {1}" self.input_dir = "dumps" self.input_file = os.path.join(".", self.input_dir,self.infile) def test_malformed_noargs(self): call = self.base_call.format(self.input_file, self.test_output_dir) print(call) proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True) proc.wait() outs, errs = proc.communicate() errlines = str(errs).split("\\n") self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0') class Test_Stdout(unittest.TestCase): def setUp(self): self.wiki = 'sailormoon' self.wikiq_out_name = self.wiki + ".tsv" self.infile = "{0}.xml.7z".format(self.wiki) self.base_call = "../wikiq {0} --stdout" self.input_dir = "dumps" self.input_file = os.path.join(".", self.input_dir,self.infile) self.baseline_output_dir = "baseline_output" def test_noargs(self): call = self.base_call.format(self.input_file) print(call) proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True) outs = proc.stdout.decode("utf8") test_file = "noargs_" + self.wikiq_out_name baseline_file = os.path.join(".", self.baseline_output_dir, test_file) print(baseline_file) test = pd.read_table(StringIO(outs)) baseline = pd.read_table(baseline_file) assert_frame_equal(test,baseline) if __name__ == '__main__': unittest.main()