]> code.communitydata.science - mediawiki_dump_tools.git/blobdiff - test/Wikiq_Unit_Test.py
fix bugs and unit tests
[mediawiki_dump_tools.git] / test / Wikiq_Unit_Test.py
index 9011d799831548c1def240c823aa7a4ba3945d8d..0a90c6cdec03c1b9492b4d9cc8251bad808fc1b2 100644 (file)
@@ -3,7 +3,7 @@ import os
 import subprocess
 from shutil import copyfile
 import pandas as pd
 import subprocess
 from shutil import copyfile
 import pandas as pd
-from pandas.util.testing import assert_frame_equal
+from pandas.testing import assert_frame_equal
 from io import StringIO
 
 # with / without pwr DONE
 from io import StringIO
 
 # with / without pwr DONE
@@ -49,7 +49,7 @@ class Test_Wikipedia(unittest.TestCase):
         # as a test let's make sure that we get equal data frames
         test = pd.read_table(test_file)
         baseline = pd.read_table(baseline_file)
         # as a test let's make sure that we get equal data frames
         test = pd.read_table(test_file)
         baseline = pd.read_table(baseline_file)
-        assert_frame_equal(test,baseline)
+        assert_frame_equal(test,baseline, check_like=True)
 
     def test_WP_namespaces(self):
         print(os.path.abspath('.'))
 
     def test_WP_namespaces(self):
         print(os.path.abspath('.'))
@@ -71,7 +71,7 @@ class Test_Wikipedia(unittest.TestCase):
         num_wrong_ns = sum(~ test.namespace.isin({0,1}))
         self.assertEqual(num_wrong_ns, 0)
         baseline = pd.read_table(baseline_file)
         num_wrong_ns = sum(~ test.namespace.isin({0,1}))
         self.assertEqual(num_wrong_ns, 0)
         baseline = pd.read_table(baseline_file)
-        assert_frame_equal(test,baseline)
+        assert_frame_equal(test,baseline, check_like=True)
 
     def test_WP_revert_radius(self):
         print(os.path.abspath('.'))
 
     def test_WP_revert_radius(self):
         print(os.path.abspath('.'))
@@ -93,7 +93,7 @@ class Test_Wikipedia(unittest.TestCase):
         num_wrong_ns = sum(~ test.namespace.isin({0,1}))
         self.assertEqual(num_wrong_ns, 0)
         baseline = pd.read_table(baseline_file)
         num_wrong_ns = sum(~ test.namespace.isin({0,1}))
         self.assertEqual(num_wrong_ns, 0)
         baseline = pd.read_table(baseline_file)
-        assert_frame_equal(test,baseline)
+        assert_frame_equal(test,baseline, check_like=True)
 
 
 
 
 
 
@@ -131,7 +131,7 @@ class Test_Basic(unittest.TestCase):
 
         test = pd.read_table(test_file)
         baseline = pd.read_table(baseline_file)
 
         test = pd.read_table(test_file)
         baseline = pd.read_table(baseline_file)
-        assert_frame_equal(test,baseline)
+        assert_frame_equal(test,baseline, check_like=True)
 
 
     def test_collapse_user(self):
 
 
     def test_collapse_user(self):
@@ -151,7 +151,7 @@ class Test_Basic(unittest.TestCase):
         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
         test = pd.read_table(test_file)
         baseline = pd.read_table(baseline_file)
         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
         test = pd.read_table(test_file)
         baseline = pd.read_table(baseline_file)
-        assert_frame_equal(test,baseline)
+        assert_frame_equal(test,baseline, check_like=True)
 
     def test_pwr_segment(self):
         test_filename =  "persistence_segment_" + self.wikiq_out_name
 
     def test_pwr_segment(self):
         test_filename =  "persistence_segment_" + self.wikiq_out_name
@@ -171,7 +171,7 @@ class Test_Basic(unittest.TestCase):
 
         test = pd.read_table(test_file)
         baseline = pd.read_table(baseline_file)
 
         test = pd.read_table(test_file)
         baseline = pd.read_table(baseline_file)
-        assert_frame_equal(test,baseline)
+        assert_frame_equal(test,baseline, check_like=True)
 
     def test_pwr_legacy(self):
         test_filename =  "persistence_legacy_" + self.wikiq_out_name
 
     def test_pwr_legacy(self):
         test_filename =  "persistence_legacy_" + self.wikiq_out_name
@@ -191,7 +191,7 @@ class Test_Basic(unittest.TestCase):
 
         test = pd.read_table(test_file)
         baseline = pd.read_table(baseline_file)
 
         test = pd.read_table(test_file)
         baseline = pd.read_table(baseline_file)
-        assert_frame_equal(test,baseline)
+        assert_frame_equal(test,baseline, check_like=True)
 
     def test_pwr(self):
         test_filename =  "persistence_" + self.wikiq_out_name
 
     def test_pwr(self):
         test_filename =  "persistence_" + self.wikiq_out_name
@@ -211,7 +211,9 @@ class Test_Basic(unittest.TestCase):
 
         test = pd.read_table(test_file)
         baseline = pd.read_table(baseline_file)
 
         test = pd.read_table(test_file)
         baseline = pd.read_table(baseline_file)
-        assert_frame_equal(test,baseline)
+
+        test = test.reindex(columns=sorted(test.columns))
+        assert_frame_equal(test,baseline, check_like=True)
 
 
     def test_url_encode(self):
 
 
     def test_url_encode(self):
@@ -230,7 +232,9 @@ class Test_Basic(unittest.TestCase):
         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
         test = pd.read_table(test_file)
         baseline = pd.read_table(baseline_file)
         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
         test = pd.read_table(test_file)
         baseline = pd.read_table(baseline_file)
-        assert_frame_equal(test,baseline)
+
+        test = test.reindex(columns=sorted(test.columns))
+        assert_frame_equal(test,baseline, check_like=True)
 
 
 class Test_Malformed(unittest.TestCase):
 
 
 class Test_Malformed(unittest.TestCase):
@@ -241,7 +245,7 @@ class Test_Malformed(unittest.TestCase):
         self.wiki = 'twinpeaks'
         self.wikiq_out_name =  self.wiki + ".tsv"
         self.test_output_dir = os.path.join(".", "test_output")
         self.wiki = 'twinpeaks'
         self.wikiq_out_name =  self.wiki + ".tsv"
         self.test_output_dir = os.path.join(".", "test_output")
-        self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
+        self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) 
 
         self.infile = "{0}.xml.7z".format(self.wiki)
         self.base_call = "../wikiq {0} -o {1}"
 
         self.infile = "{0}.xml.7z".format(self.wiki)
         self.base_call = "../wikiq {0} -o {1}"
@@ -273,6 +277,7 @@ class Test_Stdout(unittest.TestCase):
     def test_noargs(self):
 
         call = self.base_call.format(self.input_file)
     def test_noargs(self):
 
         call = self.base_call.format(self.input_file)
+        print(call)
         proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
         outs = proc.stdout.decode("utf8")
 
         proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
         outs = proc.stdout.decode("utf8")
 
@@ -281,7 +286,119 @@ class Test_Stdout(unittest.TestCase):
         print(baseline_file)
         test = pd.read_table(StringIO(outs))
         baseline = pd.read_table(baseline_file)
         print(baseline_file)
         test = pd.read_table(StringIO(outs))
         baseline = pd.read_table(baseline_file)
-        assert_frame_equal(test,baseline)
+        assert_frame_equal(test,baseline, check_like=True)
+
+class Test_Regex(unittest.TestCase):
+
+    def setUp(self):
+        self.wiki = 'regextest'
+        self.wikiq_out_name = self.wiki + '.tsv'
+        self.infile = "{0}.xml.bz2".format(self.wiki)
+
+        self.input_dir = "dumps"
+        self.input_file = os.path.join(".", self.input_dir,self.infile)
+
+        if not os.path.exists("test_output"):
+            os.mkdir("test_output")
+
+        self.test_output_dir = os.path.join(".", "test_output")
+        self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
+        # we have two base calls, one for checking inputs and the other for checking outputs
+        self.base_call = "../wikiq {0}"
+        self.base_call_outs = "../wikiq {0} -o {1}"
+
+        self.baseline_output_dir = "baseline_output"
+
+        # sample inputs for checking that bad inputs get terminated / test_regex_inputs
+        self.bad_inputs_list = [
+            #label is missing            
+            "-RP '\\b\\d+\\b'", 
+            #number of reg and number of labels do not match 
+            "-RP 'NPO V' -RP THE -RPl testlabel",
+            #cp but rp label
+            "-CP '(Tamil|Li)' -RPl testlabel",
+            #regex is missing
+            "-CPl testlabel",
+            "-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'"
+        ]
+
+        # sample inputs for checking the outcomes of good inputs / test_basic_regex
+        self.good_inputs_list = [
+            "-RP '\\b\\d{3}\\b' -RPl threedigits",
+            "-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word",
+            "-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning",
+            "-CP 'WP:EVADE' -CPl wp_evade"         
+        ]
+
         
         
+        self.cap_inputs_list = [
+            "-RP 'Li Chevalier' -RPl li_cheval -CP '(?P<letter>\\b[a-zA-Z]{3}\\b)|(?P<number>\\b\\d+\\b)|(?P<cat>\\bcat\\b)' -CPl three",
+            "-CP '(?P<a>\\bTestCaseA\\b)|(?P<b>\\bTestCaseB\\b)|(?P<c>\\bTestCaseC\\b)|(?P<d>\\bTestCaseD\\b)' -CPl testcase -RP '(?P<npov>npov|NPOV)|(?P<neutral>neutral point of view)' -RPl npov"
+        ]
+
+
+
+    def test_regex_inputs(self):
+        for input in self.bad_inputs_list:
+            call = self.base_call.format(self.input_file)
+            call = call + " --stdout " + input
+            print(call)
+            proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
+            stdout,stderr = proc.communicate()
+            #print(proc.returncode)
+            
+            # we want to check that the bad inputs were caught and sys.exit is stopping the code
+            print(stderr.decode("utf-8"))
+            self.assertNotEqual(proc.returncode,0)
+
+    def test_basic_regex(self):
+        for i, input in enumerate(self.good_inputs_list):
+
+            test_filename = "basic_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
+            #print(test_filename)
+            test_file = os.path.join(self.test_output_dir, test_filename)
+            if os.path.exists(test_file):
+                os.remove(test_file)
+
+            call = self.base_call_outs.format(self.input_file, self.test_output_dir)
+            call = call + " " + input
+            print(call)
+
+            proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
+            proc.wait()
+            copyfile(self.call_output, test_file)
+
+            test = pd.read_table(test_file)
+            
+            baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
+            baseline = pd.read_table(baseline_file)
+            assert_frame_equal(test, baseline, check_like=True)
+            print(i)
+
+
+    def test_capturegroup_regex(self):
+        for i, input in enumerate(self.cap_inputs_list):
+            test_filename = "capturegroup_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
+            print(test_filename)
+            test_file = os.path.join(self.test_output_dir, test_filename)
+            if os.path.exists(test_file):
+                os.remove(test_file)
+
+            call = self.base_call_outs.format(self.input_file, self.test_output_dir)
+            call = call + " " + input
+            print(call)
+
+            proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
+            proc.wait()
+
+            copyfile(self.call_output, test_file)
+            
+            test = pd.read_table(test_file)
+            
+            baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
+            baseline = pd.read_table(baseline_file)
+            assert_frame_equal(test, baseline, check_like=True)
+
+
 if __name__ == '__main__':
     unittest.main()
 if __name__ == '__main__':
     unittest.main()

Community Data Science Collective || Want to submit a patch?